1use crate::error::LimboError;
2use crate::io::common;
3use crate::Result;
4
5use super::{Completion, File, MemoryIO, OpenFlags, IO};
6use crate::io::clock::{Clock, Instant};
7use polling::{Event, Events, Poller};
8use rustix::{
9 fd::{AsFd, AsRawFd},
10 fs::{self, FlockOperation, OFlags, OpenOptionsExt},
11 io::Errno,
12};
13use std::{
14 cell::{RefCell, UnsafeCell},
15 mem::MaybeUninit,
16};
17use std::{
18 io::{ErrorKind, Read, Seek, Write},
19 sync::Arc,
20};
21use tracing::{debug, trace};
22
23struct OwnedCallbacks(UnsafeCell<Callbacks>);
24unsafe impl Send for OwnedCallbacks {}
26unsafe impl Sync for OwnedCallbacks {}
27struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>);
28
29impl OwnedCallbacks {
30 fn new() -> Self {
31 Self(UnsafeCell::new(Callbacks::new()))
32 }
33 fn as_mut<'io>(&self) -> &'io mut Callbacks {
34 unsafe { &mut *self.0.get() }
35 }
36
37 fn is_empty(&self) -> bool {
38 self.as_mut().inline_count == 0
39 }
40
41 fn remove(&self, fd: usize) -> Option<CompletionCallback> {
42 let callbacks = unsafe { &mut *self.0.get() };
43 callbacks.remove(fd)
44 }
45}
46
47impl BorrowedCallbacks<'_> {
48 fn insert(&self, fd: usize, callback: CompletionCallback) {
49 let callbacks = unsafe { &mut *self.0.get() };
50 callbacks.insert(fd, callback);
51 }
52}
53
54struct EventsHandler(UnsafeCell<Events>);
55
56impl EventsHandler {
57 fn new() -> Self {
58 Self(UnsafeCell::new(Events::new()))
59 }
60
61 fn clear(&self) {
62 let events = unsafe { &mut *self.0.get() };
63 events.clear();
64 }
65
66 fn iter(&self) -> impl Iterator<Item = Event> {
67 let events = unsafe { &*self.0.get() };
68 events.iter()
69 }
70
71 fn as_mut<'io>(&self) -> &'io mut Events {
72 unsafe { &mut *self.0.get() }
73 }
74}
75struct PollHandler(UnsafeCell<Poller>);
76struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>);
77
78impl BorrowedPollHandler<'_> {
79 fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> {
80 let poller = unsafe { &mut *self.0.get() };
81 unsafe { poller.add(fd, event)? }
82 Ok(())
83 }
84}
85
86impl PollHandler {
87 fn new() -> Self {
88 Self(UnsafeCell::new(
89 Poller::new().expect("failed to create poller"),
90 ))
91 }
92 fn wait(&self, events: &mut Events, timeout: Option<std::time::Duration>) -> Result<()> {
93 let poller = unsafe { &mut *self.0.get() };
94 poller.wait(events, timeout)?;
95 Ok(())
96 }
97
98 fn as_mut<'io>(&self) -> &'io mut Poller {
99 unsafe { &mut *self.0.get() }
100 }
101}
102
103type CallbackEntry = (usize, CompletionCallback);
104
105const FD_INLINE_SIZE: usize = 32;
106
107struct Callbacks {
108 inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE],
109 heap_entries: Vec<CallbackEntry>,
110 inline_count: usize,
111}
112
113impl Callbacks {
114 fn new() -> Self {
115 Self {
116 inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE],
117 heap_entries: Vec::new(),
118 inline_count: 0,
119 }
120 }
121
122 fn insert(&mut self, fd: usize, callback: CompletionCallback) {
123 if self.inline_count < FD_INLINE_SIZE {
124 self.inline_entries[self.inline_count].write((fd, callback));
125 self.inline_count += 1;
126 } else {
127 self.heap_entries.push((fd, callback));
128 }
129 }
130
131 fn remove(&mut self, fd: usize) -> Option<CompletionCallback> {
132 if let Some(pos) = self.find_inline(fd) {
133 let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() };
134
135 if pos < self.inline_count - 1 {
137 let last_valid =
138 unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() };
139 self.inline_entries[pos].write(last_valid);
140 }
141
142 self.inline_count -= 1;
143 return Some(callback);
144 }
145
146 if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) {
147 return Some(self.heap_entries.swap_remove(pos).1);
148 }
149 None
150 }
151
152 fn find_inline(&self, fd: usize) -> Option<usize> {
153 (0..self.inline_count)
154 .find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd })
155 }
156}
157
158impl Drop for Callbacks {
159 fn drop(&mut self) {
160 for i in 0..self.inline_count {
161 unsafe { self.inline_entries[i].assume_init_drop() };
162 }
163 }
164}
165
166pub struct UnixIO {
169 poller: PollHandler,
170 events: EventsHandler,
171 callbacks: OwnedCallbacks,
172}
173
174unsafe impl Send for UnixIO {}
175unsafe impl Sync for UnixIO {}
176
177impl UnixIO {
178 #[cfg(feature = "fs")]
179 pub fn new() -> Result<Self> {
180 debug!("Using IO backend 'syscall'");
181 Ok(Self {
182 poller: PollHandler::new(),
183 events: EventsHandler::new(),
184 callbacks: OwnedCallbacks::new(),
185 })
186 }
187}
188
189impl Clock for UnixIO {
190 fn now(&self) -> Instant {
191 let now = chrono::Local::now();
192 Instant {
193 secs: now.timestamp(),
194 micros: now.timestamp_subsec_micros(),
195 }
196 }
197}
198
199impl IO for UnixIO {
200 fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
201 trace!("open_file(path = {})", path);
202 let mut file = std::fs::File::options();
203 file.read(true).custom_flags(OFlags::NONBLOCK.bits() as i32);
204
205 if !flags.contains(OpenFlags::ReadOnly) {
206 file.write(true);
207 file.create(flags.contains(OpenFlags::Create));
208 }
209
210 let file = file.open(path)?;
211
212 #[allow(clippy::arc_with_non_send_sync)]
213 let unix_file = Arc::new(UnixFile {
214 file: Arc::new(RefCell::new(file)),
215 poller: BorrowedPollHandler(self.poller.as_mut().into()),
216 callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
217 });
218 if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
219 unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
220 }
221 Ok(unix_file)
222 }
223
224 fn run_once(&self) -> Result<()> {
225 if self.callbacks.is_empty() {
226 return Ok(());
227 }
228 self.events.clear();
229 trace!("run_once() waits for events");
230 self.poller.wait(self.events.as_mut(), None)?;
231
232 for event in self.events.iter() {
233 if let Some(cf) = self.callbacks.remove(event.key) {
234 let result = match cf {
235 CompletionCallback::Read(ref file, ref c, pos) => {
236 let mut file = file.borrow_mut();
237 let r = c.as_read();
238 let mut buf = r.buf_mut();
239 file.seek(std::io::SeekFrom::Start(pos as u64))?;
240 file.read(buf.as_mut_slice())
241 }
242 CompletionCallback::Write(ref file, _, ref buf, pos) => {
243 let mut file = file.borrow_mut();
244 let buf = buf.borrow();
245 file.seek(std::io::SeekFrom::Start(pos as u64))?;
246 file.write(buf.as_slice())
247 }
248 };
249 match result {
250 Ok(n) => match &cf {
251 CompletionCallback::Read(_, ref c, _) => c.complete(0),
252 CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32),
253 },
254 Err(e) => return Err(e.into()),
255 }
256 }
257 }
258 Ok(())
259 }
260
261 fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
262 while !c.is_completed() {
263 self.run_once()?;
264 }
265 Ok(())
266 }
267
268 fn generate_random_number(&self) -> i64 {
269 let mut buf = [0u8; 8];
270 getrandom::getrandom(&mut buf).expect("getrandom failed");
271 i64::from_ne_bytes(buf)
272 }
273
274 fn get_memory_io(&self) -> Arc<MemoryIO> {
275 Arc::new(MemoryIO::new())
276 }
277}
278
279enum CompletionCallback {
280 Read(Arc<RefCell<std::fs::File>>, Arc<Completion>, usize),
281 Write(
282 Arc<RefCell<std::fs::File>>,
283 Arc<Completion>,
284 Arc<RefCell<crate::Buffer>>,
285 usize,
286 ),
287}
288
289pub struct UnixFile<'io> {
290 #[allow(clippy::arc_with_non_send_sync)]
291 file: Arc<RefCell<std::fs::File>>,
292 poller: BorrowedPollHandler<'io>,
293 callbacks: BorrowedCallbacks<'io>,
294}
295unsafe impl Send for UnixFile<'_> {}
296unsafe impl Sync for UnixFile<'_> {}
297
298impl File for UnixFile<'_> {
299 fn lock_file(&self, exclusive: bool) -> Result<()> {
300 let fd = self.file.borrow();
301 let fd = fd.as_fd();
302 fs::fcntl_lock(
305 fd,
306 if exclusive {
307 FlockOperation::NonBlockingLockExclusive
308 } else {
309 FlockOperation::NonBlockingLockShared
310 },
311 )
312 .map_err(|e| {
313 let io_error = std::io::Error::from(e);
314 let message = match io_error.kind() {
315 ErrorKind::WouldBlock => {
316 "Failed locking file. File is locked by another process".to_string()
317 }
318 _ => format!("Failed locking file, {}", io_error),
319 };
320 LimboError::LockingError(message)
321 })?;
322
323 Ok(())
324 }
325
326 fn unlock_file(&self) -> Result<()> {
327 let fd = self.file.borrow();
328 let fd = fd.as_fd();
329 fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
330 LimboError::LockingError(format!(
331 "Failed to release file lock: {}",
332 std::io::Error::from(e)
333 ))
334 })?;
335 Ok(())
336 }
337
338 fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
339 let file = self.file.borrow();
340 let result = {
341 let r = c.as_read();
342 let mut buf = r.buf_mut();
343 rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
344 };
345 match result {
346 Ok(n) => {
347 trace!("pread n: {}", n);
348 c.complete(0);
350 Ok(())
351 }
352 Err(Errno::AGAIN) => {
353 trace!("pread blocks");
354 let fd = file.as_raw_fd();
356 self.poller
357 .add(&file.as_fd(), Event::readable(fd as usize))?;
358 {
359 self.callbacks.insert(
360 fd as usize,
361 CompletionCallback::Read(self.file.clone(), c, pos),
362 );
363 }
364 Ok(())
365 }
366 Err(e) => Err(e.into()),
367 }
368 }
369
370 fn pwrite(
371 &self,
372 pos: usize,
373 buffer: Arc<RefCell<crate::Buffer>>,
374 c: Arc<Completion>,
375 ) -> Result<()> {
376 let file = self.file.borrow();
377 let result = {
378 let buf = buffer.borrow();
379 rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
380 };
381 match result {
382 Ok(n) => {
383 trace!("pwrite n: {}", n);
384 c.complete(n as i32);
386 Ok(())
387 }
388 Err(Errno::AGAIN) => {
389 trace!("pwrite blocks");
390 let fd = file.as_raw_fd();
392 self.poller
393 .add(&file.as_fd(), Event::readable(fd as usize))?;
394 self.callbacks.insert(
395 fd as usize,
396 CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos),
397 );
398 Ok(())
399 }
400 Err(e) => Err(e.into()),
401 }
402 }
403
404 fn sync(&self, c: Arc<Completion>) -> Result<()> {
405 let file = self.file.borrow();
406 let result = fs::fsync(file.as_fd());
407 match result {
408 Ok(()) => {
409 trace!("fsync");
410 c.complete(0);
411 Ok(())
412 }
413 Err(e) => Err(e.into()),
414 }
415 }
416
417 fn size(&self) -> Result<u64> {
418 let file = self.file.borrow();
419 Ok(file.metadata()?.len())
420 }
421
422 fn truncate(&self, len: usize, c: Arc<Completion>) -> Result<()> {
423 let file = self.file.borrow();
424 file.set_len(len as u64)?;
425 c.complete(0);
426 Ok(())
427 }
428}
429
430impl Drop for UnixFile<'_> {
431 fn drop(&mut self) {
432 self.unlock_file().expect("Failed to unlock file");
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
441 fn test_multiple_processes_cannot_open_file() {
442 common::tests::test_multiple_processes_cannot_open_file(UnixIO::new);
443 }
444}