Skip to main content

limbo_core/io/
memory.rs

1use super::{Buffer, Clock, Completion, File, OpenFlags, IO};
2use crate::Result;
3
4use crate::io::clock::Instant;
5use std::{
6    cell::{Cell, RefCell, UnsafeCell},
7    collections::BTreeMap,
8    sync::Arc,
9};
10use tracing::debug;
11
12pub struct MemoryIO {}
13unsafe impl Send for MemoryIO {}
14
15// TODO: page size flag
16const PAGE_SIZE: usize = 4096;
17type MemPage = Box<[u8; PAGE_SIZE]>;
18
19impl MemoryIO {
20    #[allow(clippy::arc_with_non_send_sync)]
21    pub fn new() -> Self {
22        debug!("Using IO backend 'memory'");
23        Self {}
24    }
25}
26
27impl Default for MemoryIO {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl Clock for MemoryIO {
34    fn now(&self) -> Instant {
35        let now = chrono::Local::now();
36        Instant {
37            secs: now.timestamp(),
38            micros: now.timestamp_subsec_micros(),
39        }
40    }
41}
42
43impl IO for MemoryIO {
44    fn open_file(&self, _path: &str, _flags: OpenFlags, _direct: bool) -> Result<Arc<dyn File>> {
45        Ok(Arc::new(MemoryFile {
46            pages: BTreeMap::new().into(),
47            size: 0.into(),
48        }))
49    }
50
51    fn run_once(&self) -> Result<()> {
52        // nop
53        Ok(())
54    }
55
56    fn wait_for_completion(&self, _c: Arc<Completion>) -> Result<()> {
57        todo!();
58    }
59
60    fn generate_random_number(&self) -> i64 {
61        let mut buf = [0u8; 8];
62        getrandom::fill(&mut buf).expect("getrandom failed");
63        i64::from_ne_bytes(buf)
64    }
65
66    fn get_memory_io(&self) -> Arc<MemoryIO> {
67        Arc::new(MemoryIO::new())
68    }
69}
70
71pub struct MemoryFile {
72    pages: UnsafeCell<BTreeMap<usize, MemPage>>,
73    size: Cell<usize>,
74}
75unsafe impl Send for MemoryFile {}
76unsafe impl Sync for MemoryFile {}
77
78impl File for MemoryFile {
79    fn lock_file(&self, _exclusive: bool) -> Result<()> {
80        Ok(())
81    }
82    fn unlock_file(&self) -> Result<()> {
83        Ok(())
84    }
85
86    fn pread(&self, pos: usize, c: Arc<Completion>) -> Result<()> {
87        let r = c.as_read();
88        let buf_len = r.buf().len();
89        if buf_len == 0 {
90            c.complete(0);
91            return Ok(());
92        }
93
94        let file_size = self.size.get();
95        if pos >= file_size {
96            c.complete(0);
97            return Ok(());
98        }
99
100        let read_len = buf_len.min(file_size - pos);
101        {
102            let mut read_buf = r.buf_mut();
103            let mut offset = pos;
104            let mut remaining = read_len;
105            let mut buf_offset = 0;
106
107            while remaining > 0 {
108                let page_no = offset / PAGE_SIZE;
109                let page_offset = offset % PAGE_SIZE;
110                let bytes_to_read = remaining.min(PAGE_SIZE - page_offset);
111                if let Some(page) = self.get_page(page_no) {
112                    read_buf.as_mut_slice()[buf_offset..buf_offset + bytes_to_read]
113                        .copy_from_slice(&page[page_offset..page_offset + bytes_to_read]);
114                } else {
115                    read_buf.as_mut_slice()[buf_offset..buf_offset + bytes_to_read].fill(0);
116                }
117
118                offset += bytes_to_read;
119                buf_offset += bytes_to_read;
120                remaining -= bytes_to_read;
121            }
122        }
123        c.complete(read_len as i32);
124        Ok(())
125    }
126
127    fn pwrite(&self, pos: usize, buffer: Arc<RefCell<Buffer>>, c: Arc<Completion>) -> Result<()> {
128        let buf = buffer.borrow();
129        let buf_len = buf.len();
130        if buf_len == 0 {
131            c.complete(0);
132            return Ok(());
133        }
134
135        let mut offset = pos;
136        let mut remaining = buf_len;
137        let mut buf_offset = 0;
138        let data = &buf.as_slice();
139
140        while remaining > 0 {
141            let page_no = offset / PAGE_SIZE;
142            let page_offset = offset % PAGE_SIZE;
143            let bytes_to_write = remaining.min(PAGE_SIZE - page_offset);
144
145            {
146                let page = self.get_or_allocate_page(page_no);
147                page[page_offset..page_offset + bytes_to_write]
148                    .copy_from_slice(&data[buf_offset..buf_offset + bytes_to_write]);
149            }
150
151            offset += bytes_to_write;
152            buf_offset += bytes_to_write;
153            remaining -= bytes_to_write;
154        }
155
156        self.size
157            .set(core::cmp::max(pos + buf_len, self.size.get()));
158
159        c.complete(buf_len as i32);
160        Ok(())
161    }
162
163    fn sync(&self, c: Arc<Completion>) -> Result<()> {
164        // no-op
165        c.complete(0);
166        Ok(())
167    }
168
169    fn size(&self) -> Result<u64> {
170        Ok(self.size.get() as u64)
171    }
172
173    fn truncate(&self, len: usize, c: Arc<Completion>) -> Result<()> {
174        // Drop any pages beyond the new length and shrink size.
175        let pages = unsafe { &mut *self.pages.get() };
176        let keep_pages = len.div_ceil(PAGE_SIZE);
177        pages.retain(|&page_no, _| page_no < keep_pages);
178        self.size.set(len);
179        c.complete(0);
180        Ok(())
181    }
182}
183
184impl Drop for MemoryFile {
185    fn drop(&mut self) {
186        // no-op
187    }
188}
189
190impl MemoryFile {
191    #[allow(clippy::mut_from_ref)]
192    fn get_or_allocate_page(&self, page_no: usize) -> &mut MemPage {
193        unsafe {
194            let pages = &mut *self.pages.get();
195            pages
196                .entry(page_no)
197                .or_insert_with(|| Box::new([0; PAGE_SIZE]))
198        }
199    }
200
201    fn get_page(&self, page_no: usize) -> Option<&MemPage> {
202        unsafe { (*self.pages.get()).get(&page_no) }
203    }
204}