quill_sql/storage/
disk_manager.rs

1use log::{debug, warn};
2use std::alloc::{alloc, alloc_zeroed, dealloc, Layout};
3use std::fs::File;
4use std::path::Path;
5use std::ptr::NonNull;
6use std::slice;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::RwLock;
9use std::{
10    io::{ErrorKind, Read, Seek, SeekFrom, Write},
11    sync::{Mutex, MutexGuard},
12};
13
14#[cfg(target_os = "linux")]
15use std::os::unix::fs::OpenOptionsExt;
16
17use crate::error::{QuillSQLError, QuillSQLResult};
18
19use crate::buffer::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
20use crate::storage::codec::FreelistPageCodec;
21use crate::storage::page::FreelistPage;
22use crate::storage::page::{decode_meta_page, encode_meta_page, MetaPage, META_PAGE_SIZE};
23
24static EMPTY_PAGE: [u8; PAGE_SIZE] = [0; PAGE_SIZE];
25
26/// Page-aligned buffer suitable for O_DIRECT transfers.
27pub(crate) struct AlignedPageBuf {
28    ptr: NonNull<u8>,
29    layout: Layout,
30}
31
32impl AlignedPageBuf {
33    pub(crate) fn new_zeroed() -> QuillSQLResult<Self> {
34        Self::allocate(true)
35    }
36
37    pub(crate) fn new_uninit() -> QuillSQLResult<Self> {
38        Self::allocate(false)
39    }
40
41    fn allocate(zeroed: bool) -> QuillSQLResult<Self> {
42        let layout = Layout::from_size_align(PAGE_SIZE, PAGE_SIZE)
43            .map_err(|_| QuillSQLError::Internal("Invalid PAGE_SIZE layout".into()))?;
44        let ptr = unsafe {
45            if zeroed {
46                alloc_zeroed(layout)
47            } else {
48                alloc(layout)
49            }
50        };
51        let Some(non_null) = NonNull::new(ptr) else {
52            return Err(QuillSQLError::Internal("Aligned allocation failed".into()));
53        };
54        Ok(Self {
55            ptr: non_null,
56            layout,
57        })
58    }
59
60    pub(crate) fn as_slice(&self) -> &[u8] {
61        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), PAGE_SIZE) }
62    }
63
64    pub(crate) fn as_mut_slice(&mut self) -> &mut [u8] {
65        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), PAGE_SIZE) }
66    }
67
68    pub(crate) fn ptr(&self) -> *mut u8 {
69        self.ptr.as_ptr()
70    }
71}
72
73impl Drop for AlignedPageBuf {
74    fn drop(&mut self) {
75        unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
76    }
77}
78
79#[derive(Debug)]
80pub struct DiskManager {
81    next_page_id: AtomicU32,
82    db_file: Mutex<File>,
83    pub meta: RwLock<MetaPage>,
84    direct_io: bool,
85}
86
87impl DiskManager {
88    fn open_raw_file(db_path: &Path, create: bool, direct: bool) -> std::io::Result<(File, bool)> {
89        let mut request_direct = direct
90            && !std::env::var("QUILL_DISABLE_DIRECT_IO")
91                .map_or(false, |v| v == "1" || v.eq_ignore_ascii_case("true"));
92
93        #[cfg(target_os = "linux")]
94        if request_direct {
95            let mut direct_opts = std::fs::OpenOptions::new();
96            direct_opts.read(true).write(true);
97            if create {
98                direct_opts.create(true);
99            }
100            direct_opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME);
101            match direct_opts.open(db_path) {
102                Ok(file) => return Ok((file, true)),
103                Err(err) => {
104                    let needs_fallback = err.kind() == ErrorKind::InvalidInput
105                        || err.raw_os_error() == Some(libc::EINVAL);
106                    if !needs_fallback {
107                        return Err(err);
108                    }
109                    debug!(
110                        "Direct I/O not available for {}: {}. Falling back to buffered mode",
111                        db_path.display(),
112                        err
113                    );
114                    request_direct = false;
115                }
116            }
117        }
118
119        let mut options = std::fs::OpenOptions::new();
120        options.read(true).write(true);
121        if create {
122            options.create(true);
123        }
124        #[cfg(target_os = "linux")]
125        if request_direct {
126            options.custom_flags(libc::O_NOATIME);
127        }
128
129        options.open(db_path).map(|file| {
130            #[cfg(target_os = "linux")]
131            {
132                (file, request_direct)
133            }
134            #[cfg(not(target_os = "linux"))]
135            {
136                let _ = request_direct;
137                (file, false)
138            }
139        })
140    }
141
142    pub fn try_new(db_path: impl AsRef<Path>) -> QuillSQLResult<Self> {
143        let mut is_new_file = false;
144        let db_path_ref = db_path.as_ref();
145        let (db_file, meta, direct_enabled) = if db_path_ref.exists() {
146            let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, false, true)?;
147            let meta_page = if direct_ok {
148                let mut aligned = AlignedPageBuf::new_zeroed()?;
149                match db_file.read_exact(aligned.as_mut_slice()) {
150                    Ok(()) => {
151                        let (meta_page, _) = decode_meta_page(aligned.as_slice())?;
152                        meta_page
153                    }
154                    Err(err) => {
155                        if err.kind() == ErrorKind::InvalidInput
156                            || err.raw_os_error() == Some(libc::EINVAL)
157                        {
158                            let (mut fallback_file, _) =
159                                Self::open_raw_file(db_path_ref, false, false)?;
160                            let mut buf = vec![0; *META_PAGE_SIZE];
161                            fallback_file.read_exact(&mut buf)?;
162                            let (meta_page, _) = decode_meta_page(&buf)?;
163                            db_file = fallback_file;
164                            direct_ok = false;
165                            meta_page
166                        } else {
167                            return Err(err.into());
168                        }
169                    }
170                }
171            } else {
172                let mut buf = vec![0; *META_PAGE_SIZE];
173                db_file.read_exact(&mut buf)?;
174                let (meta_page, _) = decode_meta_page(&buf)?;
175                meta_page
176            };
177            (db_file, meta_page, direct_ok)
178        } else {
179            is_new_file = true;
180            let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, true, true)?;
181            let meta_page = MetaPage::try_new()?;
182            let meta_bytes = encode_meta_page(&meta_page);
183
184            if direct_ok {
185                let mut aligned = AlignedPageBuf::new_zeroed()?;
186                aligned.as_mut_slice().copy_from_slice(&meta_bytes);
187
188                if let Err(err) = db_file.write_all(aligned.as_slice()) {
189                    let mut need_fallback = err.kind() == ErrorKind::InvalidInput;
190                    #[cfg(target_os = "linux")]
191                    {
192                        if !need_fallback && err.raw_os_error() == Some(libc::EINVAL) {
193                            need_fallback = true;
194                        }
195                    }
196
197                    if need_fallback {
198                        let (mut fallback_file, _) =
199                            Self::open_raw_file(db_path_ref, false, false)?;
200                        fallback_file.write_all(&meta_bytes)?;
201                        db_file = fallback_file;
202                        direct_ok = false;
203                    } else {
204                        return Err(err.into());
205                    }
206                }
207            } else {
208                db_file.write_all(&meta_bytes)?;
209            }
210
211            (db_file, meta_page, direct_ok)
212        };
213
214        // calculate next page id
215        let db_file_len = db_file.metadata()?.len();
216        if (db_file_len - *META_PAGE_SIZE as u64) % PAGE_SIZE as u64 != 0 {
217            return Err(QuillSQLError::Internal(format!(
218                "db file size not a multiple of {} + meta page size {}",
219                PAGE_SIZE, *META_PAGE_SIZE,
220            )));
221        }
222        let next_page_id =
223            (((db_file_len - *META_PAGE_SIZE as u64) / PAGE_SIZE as u64) + 1) as PageId;
224        debug!("Initialized disk_manager next_page_id: {}", next_page_id);
225
226        let disk_manager = Self {
227            next_page_id: AtomicU32::new(next_page_id),
228            // Use a mutex to wrap the file handle to ensure that only one thread
229            // can access the file at the same time among multiple threads.
230            db_file: Mutex::new(db_file),
231            meta: RwLock::new(meta),
232            direct_io: direct_enabled,
233        };
234
235        #[cfg(target_os = "linux")]
236        if !direct_enabled {
237            warn!("DiskManager running without O_DIRECT; expect OS page cache usage");
238        }
239
240        // new pages
241        if is_new_file {
242            let freelist_page_id = disk_manager.allocate_freelist_page()?;
243            let information_schema_schemas_first_page_id = disk_manager.allocate_page()?;
244            let information_schema_tables_first_page_id = disk_manager.allocate_page()?;
245            let information_schema_columns_first_page_id = disk_manager.allocate_page()?;
246            let information_schema_indexes_first_page_id = disk_manager.allocate_page()?;
247
248            let mut meta = disk_manager.meta.write().unwrap();
249            meta.freelist_page_id = freelist_page_id;
250            meta.information_schema_schemas_first_page_id =
251                information_schema_schemas_first_page_id;
252            meta.information_schema_tables_first_page_id = information_schema_tables_first_page_id;
253            meta.information_schema_columns_first_page_id =
254                information_schema_columns_first_page_id;
255            meta.information_schema_indexes_first_page_id =
256                information_schema_indexes_first_page_id;
257            drop(meta);
258            disk_manager.write_meta_page()?;
259        }
260        debug!(
261            "disk_manager meta page: {:?}",
262            disk_manager.meta.read().unwrap()
263        );
264
265        Ok(disk_manager)
266    }
267
268    pub fn read_page(&self, page_id: PageId) -> QuillSQLResult<[u8; PAGE_SIZE]> {
269        if page_id == crate::buffer::INVALID_PAGE_ID {
270            return Err(QuillSQLError::Storage(
271                "read_page: invalid page id".to_string(),
272            ));
273        }
274        let mut guard = self.db_file.lock().unwrap();
275        let mut aligned = AlignedPageBuf::new_zeroed()?;
276
277        guard.seek(SeekFrom::Start(
278            (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
279        ))?;
280        guard.read_exact(aligned.as_mut_slice())?;
281
282        let mut page = [0u8; PAGE_SIZE];
283        page.copy_from_slice(aligned.as_slice());
284        Ok(page)
285    }
286
287    pub fn write_page(&self, page_id: PageId, data: &[u8]) -> QuillSQLResult<()> {
288        if page_id == crate::buffer::INVALID_PAGE_ID {
289            return Err(QuillSQLError::Storage(
290                "write_page: invalid page id".to_string(),
291            ));
292        }
293        if data.len() != PAGE_SIZE {
294            return Err(QuillSQLError::Internal(format!(
295                "Page size is not {}",
296                PAGE_SIZE
297            )));
298        }
299        let mut guard = self.db_file.lock().unwrap();
300        self.write_page_internal(&mut guard, page_id, data)
301    }
302
303    pub fn allocate_page(&self) -> QuillSQLResult<PageId> {
304        if let Some(page_id) = self.freelist_pop()? {
305            Ok(page_id)
306        } else {
307            let mut guard = self.db_file.lock().unwrap();
308
309            // fetch current value and increment page id
310            let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
311
312            // Write an empty page (all zeros) to the allocated page.
313            self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
314
315            Ok(page_id)
316        }
317    }
318
319    pub fn allocate_freelist_page(&self) -> QuillSQLResult<PageId> {
320        let page_id = self.allocate_page()?;
321        let freelist_page = FreelistPage::new();
322        self.write_page(page_id, &FreelistPageCodec::encode(&freelist_page))?;
323        Ok(page_id)
324    }
325
326    pub fn deallocate_page(&self, page_id: PageId) -> QuillSQLResult<()> {
327        if page_id == crate::buffer::INVALID_PAGE_ID {
328            return Err(QuillSQLError::Storage(
329                "deallocate_page: invalid page id".to_string(),
330            ));
331        }
332        // Write an empty page (all zeros) to the deallocated page.
333        // But this page is not deallocated, only data will be written with null or zeros.
334        let mut guard = self.db_file.lock().unwrap();
335        self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
336        drop(guard);
337
338        self.freelist_push(page_id)?;
339        Ok(())
340    }
341
342    fn freelist_push(&self, page_id: PageId) -> QuillSQLResult<()> {
343        let mut curr_page_id = INVALID_PAGE_ID;
344        let mut next_page_id = self.meta.read().unwrap().freelist_page_id;
345        loop {
346            let mut freelist_page = if next_page_id == INVALID_PAGE_ID {
347                next_page_id = self.allocate_freelist_page()?;
348                if curr_page_id != INVALID_PAGE_ID {
349                    let (mut curr_freelist_page, _) =
350                        FreelistPageCodec::decode(&self.read_page(curr_page_id)?)?;
351                    curr_freelist_page.header.next_page_id = next_page_id;
352                    self.write_page(
353                        curr_page_id,
354                        &FreelistPageCodec::encode(&curr_freelist_page),
355                    )?;
356                }
357
358                FreelistPage::new()
359            } else {
360                let (freelist_page, _) = FreelistPageCodec::decode(&self.read_page(next_page_id)?)?;
361                freelist_page
362            };
363
364            if freelist_page.is_full() {
365                curr_page_id = next_page_id;
366                next_page_id = freelist_page.header.next_page_id;
367            } else {
368                freelist_page.push(page_id);
369                // persist page data
370                self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
371                break;
372            }
373        }
374        Ok(())
375    }
376
377    fn freelist_pop(&self) -> QuillSQLResult<Option<PageId>> {
378        let mut freelist_page_id = self.meta.read().unwrap().freelist_page_id;
379        loop {
380            if freelist_page_id != INVALID_PAGE_ID {
381                let (mut freelist_page, _) =
382                    FreelistPageCodec::decode(&self.read_page(freelist_page_id)?)?;
383                if let Some(page_id) = freelist_page.pop() {
384                    self.write_page(freelist_page_id, &FreelistPageCodec::encode(&freelist_page))?;
385                    return Ok(Some(page_id));
386                } else {
387                    freelist_page_id = freelist_page.header.next_page_id;
388                }
389            } else {
390                return Ok(None);
391            }
392        }
393    }
394
395    fn write_meta_page(&self) -> QuillSQLResult<()> {
396        let mut guard = self.db_file.lock().unwrap();
397        guard.seek(std::io::SeekFrom::Start(0))?;
398        let encoded = encode_meta_page(&self.meta.read().unwrap());
399        if self.direct_io && encoded.as_ptr() as usize % PAGE_SIZE != 0 {
400            let mut aligned = AlignedPageBuf::new_zeroed()?;
401            aligned.as_mut_slice().copy_from_slice(&encoded);
402            guard.write_all(aligned.as_slice())?;
403        } else {
404            guard.write_all(&encoded)?;
405        }
406        if !self.direct_io {
407            guard.flush()?;
408        }
409        Ok(())
410    }
411
412    fn write_page_internal(
413        &self,
414        guard: &mut MutexGuard<File>,
415        page_id: PageId,
416        data: &[u8],
417    ) -> QuillSQLResult<()> {
418        // Seek to the start of the page in the database file and write the data.
419        guard.seek(SeekFrom::Start(
420            (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
421        ))?;
422
423        if data.as_ptr() as usize % PAGE_SIZE == 0 {
424            guard.write_all(data)?;
425        } else {
426            let mut aligned = AlignedPageBuf::new_zeroed()?;
427            aligned.as_mut_slice().copy_from_slice(data);
428            guard.write_all(aligned.as_slice())?;
429        }
430        if !self.direct_io {
431            guard.flush()?;
432        }
433        Ok(())
434    }
435
436    pub fn db_file_len(&self) -> QuillSQLResult<u64> {
437        let guard = self.db_file.lock().unwrap();
438        let meta = guard.metadata()?;
439        Ok(meta.len())
440    }
441
442    #[cfg(target_os = "linux")]
443    pub fn try_clone_db_file(&self) -> QuillSQLResult<File> {
444        let guard = self.db_file.lock().unwrap();
445        let cloned = guard.try_clone()?;
446        Ok(cloned)
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use crate::buffer::PAGE_SIZE;
453    use tempfile::TempDir;
454
455    #[test]
456    pub fn test_disk_manager_write_read_page() {
457        let temp_dir = TempDir::new().unwrap();
458        let temp_path = temp_dir.path().join("test.db");
459
460        let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
461
462        let page_id1 = disk_manager.allocate_page().unwrap();
463        assert_eq!(page_id1, 6);
464        let mut page1 = vec![1, 2, 3];
465        page1.extend(vec![0; PAGE_SIZE - 3]);
466        disk_manager.write_page(page_id1, &page1).unwrap();
467        let page = disk_manager.read_page(page_id1).unwrap();
468        assert_eq!(page, page1.as_slice());
469
470        let page_id2 = disk_manager.allocate_page().unwrap();
471        assert_eq!(page_id2, 7);
472        let mut page2 = vec![0; PAGE_SIZE - 3];
473        page2.extend(vec![4, 5, 6]);
474        disk_manager.write_page(page_id2, &page2).unwrap();
475        let page = disk_manager.read_page(page_id2).unwrap();
476        assert_eq!(page, page2.as_slice());
477
478        let db_file_len = disk_manager.db_file_len().unwrap();
479        assert_eq!(db_file_len as usize, PAGE_SIZE * 7 + PAGE_SIZE);
480    }
481
482    #[test]
483    pub fn test_disk_manager_freelist() {
484        let temp_dir = TempDir::new().unwrap();
485        let temp_path = temp_dir.path().join("test.db");
486
487        let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
488
489        let page_id1 = disk_manager.allocate_page().unwrap();
490        let _page_id2 = disk_manager.allocate_page().unwrap();
491        let _page_id3 = disk_manager.allocate_page().unwrap();
492
493        disk_manager.deallocate_page(page_id1).unwrap();
494
495        let page_id4 = disk_manager.allocate_page().unwrap();
496        assert_eq!(page_id1, page_id4);
497    }
498}