Skip to main content

limbo_core/io/
unix.rs

1use crate::error::LimboError;
2use crate::io::common;
3use crate::Result;
4
5use super::{Completion, File, MemoryIO, OpenFlags, IO};
6use crate::io::clock::{Clock, Instant};
7use polling::{Event, Events, Poller};
8use rustix::{
9    fd::{AsFd, AsRawFd},
10    fs::{self, FlockOperation, OFlags, OpenOptionsExt},
11    io::Errno,
12};
13use std::{
14    cell::{RefCell, UnsafeCell},
15    mem::MaybeUninit,
16};
17use std::{
18    io::{ErrorKind, Read, Seek, Write},
19    sync::Arc,
20};
21use tracing::{debug, trace};
22
23struct OwnedCallbacks(UnsafeCell<Callbacks>);
24// We assume we locking on IO level is done by user.
25unsafe impl Send for OwnedCallbacks {}
26unsafe impl Sync for OwnedCallbacks {}
27struct BorrowedCallbacks<'io>(UnsafeCell<&'io mut Callbacks>);
28
29impl OwnedCallbacks {
30    fn new() -> Self {
31        Self(UnsafeCell::new(Callbacks::new()))
32    }
33    fn as_mut<'io>(&self) -> &'io mut Callbacks {
34        unsafe { &mut *self.0.get() }
35    }
36
37    fn is_empty(&self) -> bool {
38        self.as_mut().inline_count == 0
39    }
40
41    fn remove(&self, fd: usize) -> Option<CompletionCallback> {
42        let callbacks = unsafe { &mut *self.0.get() };
43        callbacks.remove(fd)
44    }
45}
46
47impl BorrowedCallbacks<'_> {
48    fn insert(&self, fd: usize, callback: CompletionCallback) {
49        let callbacks = unsafe { &mut *self.0.get() };
50        callbacks.insert(fd, callback);
51    }
52}
53
54struct EventsHandler(UnsafeCell<Events>);
55
56impl EventsHandler {
57    fn new() -> Self {
58        Self(UnsafeCell::new(Events::new()))
59    }
60
61    fn clear(&self) {
62        let events = unsafe { &mut *self.0.get() };
63        events.clear();
64    }
65
66    fn iter(&self) -> impl Iterator<Item = Event> {
67        let events = unsafe { &*self.0.get() };
68        events.iter()
69    }
70
71    fn as_mut<'io>(&self) -> &'io mut Events {
72        unsafe { &mut *self.0.get() }
73    }
74}
75struct PollHandler(UnsafeCell<Poller>);
76struct BorrowedPollHandler<'io>(UnsafeCell<&'io mut Poller>);
77
78impl BorrowedPollHandler<'_> {
79    fn add(&self, fd: &rustix::fd::BorrowedFd, event: Event) -> Result<()> {
80        let poller = unsafe { &mut *self.0.get() };
81        unsafe { poller.add(fd, event)? }
82        Ok(())
83    }
84}
85
86impl PollHandler {
87    fn new() -> Self {
88        Self(UnsafeCell::new(
89            Poller::new().expect("failed to create poller"),
90        ))
91    }
92    fn wait(&self, events: &mut Events, timeout: Option<std::time::Duration>) -> Result<()> {
93        let poller = unsafe { &mut *self.0.get() };
94        poller.wait(events, timeout)?;
95        Ok(())
96    }
97
98    fn as_mut<'io>(&self) -> &'io mut Poller {
99        unsafe { &mut *self.0.get() }
100    }
101}
102
103type CallbackEntry = (usize, CompletionCallback);
104
105const FD_INLINE_SIZE: usize = 32;
106
107struct Callbacks {
108    inline_entries: [MaybeUninit<(usize, CompletionCallback)>; FD_INLINE_SIZE],
109    heap_entries: Vec<CallbackEntry>,
110    inline_count: usize,
111}
112
113impl Callbacks {
114    fn new() -> Self {
115        Self {
116            inline_entries: [const { MaybeUninit::uninit() }; FD_INLINE_SIZE],
117            heap_entries: Vec::new(),
118            inline_count: 0,
119        }
120    }
121
122    fn insert(&mut self, fd: usize, callback: CompletionCallback) {
123        if self.inline_count < FD_INLINE_SIZE {
124            self.inline_entries[self.inline_count].write((fd, callback));
125            self.inline_count += 1;
126        } else {
127            self.heap_entries.push((fd, callback));
128        }
129    }
130
131    fn remove(&mut self, fd: usize) -> Option<CompletionCallback> {
132        if let Some(pos) = self.find_inline(fd) {
133            let (_, callback) = unsafe { self.inline_entries[pos].assume_init_read() };
134
135            // if not the last element, move the last valid entry into this position
136            if pos < self.inline_count - 1 {
137                let last_valid =
138                    unsafe { self.inline_entries[self.inline_count - 1].assume_init_read() };
139                self.inline_entries[pos].write(last_valid);
140            }
141
142            self.inline_count -= 1;
143            return Some(callback);
144        }
145
146        if let Some(pos) = self.heap_entries.iter().position(|&(k, _)| k == fd) {
147            return Some(self.heap_entries.swap_remove(pos).1);
148        }
149        None
150    }
151
152    fn find_inline(&self, fd: usize) -> Option<usize> {
153        (0..self.inline_count)
154            .find(|&i| unsafe { self.inline_entries[i].assume_init_ref().0 == fd })
155    }
156}
157
158impl Drop for Callbacks {
159    fn drop(&mut self) {
160        for i in 0..self.inline_count {
161            unsafe { self.inline_entries[i].assume_init_drop() };
162        }
163    }
164}
165
166/// UnixIO lives longer than any of the files it creates, so it is
167/// safe to store references to it's internals in the UnixFiles
168pub struct UnixIO {
169    poller: PollHandler,
170    events: EventsHandler,
171    callbacks: OwnedCallbacks,
172}
173
174unsafe impl Send for UnixIO {}
175unsafe impl Sync for UnixIO {}
176
177impl UnixIO {
178    #[cfg(feature = "fs")]
179    pub fn new() -> Result<Self> {
180        debug!("Using IO backend 'syscall'");
181        Ok(Self {
182            poller: PollHandler::new(),
183            events: EventsHandler::new(),
184            callbacks: OwnedCallbacks::new(),
185        })
186    }
187}
188
189impl Clock for UnixIO {
190    fn now(&self) -> Instant {
191        let now = chrono::Local::now();
192        Instant {
193            secs: now.timestamp(),
194            micros: now.timestamp_subsec_micros(),
195        }
196    }
197}
198
199impl IO for UnixIO {
200    fn open_file(&self, path: &str, flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
201        trace!("open_file(path = {})", path);
202        let mut file = std::fs::File::options();
203        file.read(true).custom_flags(OFlags::NONBLOCK.bits() as i32);
204
205        if !flags.contains(OpenFlags::ReadOnly) {
206            file.write(true);
207            file.create(flags.contains(OpenFlags::Create));
208        }
209
210        let file = file.open(path)?;
211
212        #[allow(clippy::arc_with_non_send_sync)]
213        let unix_file = Arc::new(UnixFile {
214            file: Arc::new(RefCell::new(file)),
215            poller: BorrowedPollHandler(self.poller.as_mut().into()),
216            callbacks: BorrowedCallbacks(self.callbacks.as_mut().into()),
217        });
218        if std::env::var(common::ENV_DISABLE_FILE_LOCK).is_err() {
219            unix_file.lock_file(!flags.contains(OpenFlags::ReadOnly))?;
220        }
221        Ok(unix_file)
222    }
223
224    fn run_once(&self) -> Result<()> {
225        if self.callbacks.is_empty() {
226            return Ok(());
227        }
228        self.events.clear();
229        trace!("run_once() waits for events");
230        self.poller.wait(self.events.as_mut(), None)?;
231
232        for event in self.events.iter() {
233            if let Some(cf) = self.callbacks.remove(event.key) {
234                let result = match cf {
235                    CompletionCallback::Read(ref file, ref c, pos) => {
236                        let mut file = file.borrow_mut();
237                        let r = c.as_read();
238                        let mut buf = r.buf_mut();
239                        file.seek(std::io::SeekFrom::Start(pos as u64))?;
240                        file.read(buf.as_mut_slice())
241                    }
242                    CompletionCallback::Write(ref file, _, ref buf, pos) => {
243                        let mut file = file.borrow_mut();
244                        let buf = buf.borrow();
245                        file.seek(std::io::SeekFrom::Start(pos as u64))?;
246                        file.write(buf.as_slice())
247                    }
248                };
249                match result {
250                    Ok(n) => match &cf {
251                        CompletionCallback::Read(_, ref c, _) => c.complete(0),
252                        CompletionCallback::Write(_, ref c, _, _) => c.complete(n as i32),
253                    },
254                    Err(e) => return Err(e.into()),
255                }
256            }
257        }
258        Ok(())
259    }
260
261    fn wait_for_completion(&self, c: Arc<Completion>) -> Result<()> {
262        while !c.is_completed() {
263            self.run_once()?;
264        }
265        Ok(())
266    }
267
268    fn generate_random_number(&self) -> i64 {
269        let mut buf = [0u8; 8];
270        getrandom::getrandom(&mut buf).expect("getrandom failed");
271        i64::from_ne_bytes(buf)
272    }
273
274    fn get_memory_io(&self) -> Arc<MemoryIO> {
275        Arc::new(MemoryIO::new())
276    }
277}
278
279enum CompletionCallback {
280    Read(Arc<RefCell<std::fs::File>>, Arc<Completion>, usize),
281    Write(
282        Arc<RefCell<std::fs::File>>,
283        Arc<Completion>,
284        Arc<RefCell<crate::Buffer>>,
285        usize,
286    ),
287}
288
289pub struct UnixFile<'io> {
290    #[allow(clippy::arc_with_non_send_sync)]
291    file: Arc<RefCell<std::fs::File>>,
292    poller: BorrowedPollHandler<'io>,
293    callbacks: BorrowedCallbacks<'io>,
294}
295unsafe impl Send for UnixFile<'_> {}
296unsafe impl Sync for UnixFile<'_> {}
297
298impl File for UnixFile<'_> {
299    fn lock_file(&self, exclusive: bool) -> Result<()> {
300        let fd = self.file.borrow();
301        let fd = fd.as_fd();
302        // F_SETLK is a non-blocking lock. The lock will be released when the file is closed
303        // or the process exits or after an explicit unlock.
304        fs::fcntl_lock(
305            fd,
306            if exclusive {
307                FlockOperation::NonBlockingLockExclusive
308            } else {
309                FlockOperation::NonBlockingLockShared
310            },
311        )
312        .map_err(|e| {
313            let io_error = std::io::Error::from(e);
314            let message = match io_error.kind() {
315                ErrorKind::WouldBlock => {
316                    "Failed locking file. File is locked by another process".to_string()
317                }
318                _ => format!("Failed locking file, {}", io_error),
319            };
320            LimboError::LockingError(message)
321        })?;
322
323        Ok(())
324    }
325
326    fn unlock_file(&self) -> Result<()> {
327        let fd = self.file.borrow();
328        let fd = fd.as_fd();
329        fs::fcntl_lock(fd, FlockOperation::NonBlockingUnlock).map_err(|e| {
330            LimboError::LockingError(format!(
331                "Failed to release file lock: {}",
332                std::io::Error::from(e)
333            ))
334        })?;
335        Ok(())
336    }
337
338    fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
339        let file = self.file.borrow();
340        let result = {
341            let r = c.as_read();
342            let mut buf = r.buf_mut();
343            rustix::io::pread(file.as_fd(), buf.as_mut_slice(), pos as u64)
344        };
345        match result {
346            Ok(n) => {
347                trace!("pread n: {}", n);
348                // Read succeeded immediately
349                c.complete(0);
350                Ok(())
351            }
352            Err(Errno::AGAIN) => {
353                trace!("pread blocks");
354                // Would block, set up polling
355                let fd = file.as_raw_fd();
356                self.poller
357                    .add(&file.as_fd(), Event::readable(fd as usize))?;
358                {
359                    self.callbacks.insert(
360                        fd as usize,
361                        CompletionCallback::Read(self.file.clone(), c, pos),
362                    );
363                }
364                Ok(())
365            }
366            Err(e) => Err(e.into()),
367        }
368    }
369
370    fn pwrite(
371        &self,
372        pos: usize,
373        buffer: Arc<RefCell<crate::Buffer>>,
374        c: Arc<Completion>,
375    ) -> Result<()> {
376        let file = self.file.borrow();
377        let result = {
378            let buf = buffer.borrow();
379            rustix::io::pwrite(file.as_fd(), buf.as_slice(), pos as u64)
380        };
381        match result {
382            Ok(n) => {
383                trace!("pwrite n: {}", n);
384                // Read succeeded immediately
385                c.complete(n as i32);
386                Ok(())
387            }
388            Err(Errno::AGAIN) => {
389                trace!("pwrite blocks");
390                // Would block, set up polling
391                let fd = file.as_raw_fd();
392                self.poller
393                    .add(&file.as_fd(), Event::readable(fd as usize))?;
394                self.callbacks.insert(
395                    fd as usize,
396                    CompletionCallback::Write(self.file.clone(), c, buffer.clone(), pos),
397                );
398                Ok(())
399            }
400            Err(e) => Err(e.into()),
401        }
402    }
403
404    fn sync(&self, c: Arc<Completion>) -> Result<()> {
405        let file = self.file.borrow();
406        let result = fs::fsync(file.as_fd());
407        match result {
408            Ok(()) => {
409                trace!("fsync");
410                c.complete(0);
411                Ok(())
412            }
413            Err(e) => Err(e.into()),
414        }
415    }
416
417    fn size(&self) -> Result<u64> {
418        let file = self.file.borrow();
419        Ok(file.metadata()?.len())
420    }
421
422    fn truncate(&self, len: usize, c: Arc<Completion>) -> Result<()> {
423        let file = self.file.borrow();
424        file.set_len(len as u64)?;
425        c.complete(0);
426        Ok(())
427    }
428}
429
430impl Drop for UnixFile<'_> {
431    fn drop(&mut self) {
432        self.unlock_file().expect("Failed to unlock file");
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_multiple_processes_cannot_open_file() {
442        common::tests::test_multiple_processes_cannot_open_file(UnixIO::new);
443    }
444}