quill_sql/storage/
disk_manager.rs

1use log::{debug, warn};
2use std::alloc::{alloc, alloc_zeroed, dealloc, Layout};
3use std::fs::File;
4use std::path::Path;
5use std::ptr::NonNull;
6use std::slice;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::RwLock;
9use std::{
10    io::{ErrorKind, Read, Seek, SeekFrom, Write},
11    sync::{Mutex, MutexGuard},
12};
13
14#[cfg(target_os = "linux")]
15use std::os::unix::fs::OpenOptionsExt;
16
17use crate::error::{QuillSQLError, QuillSQLResult};
18
19use crate::buffer::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
20use crate::storage::codec::FreelistPageCodec;
21use crate::storage::page::FreelistPage;
22use crate::storage::page::{decode_meta_page, encode_meta_page, MetaPage, META_PAGE_SIZE};
23
24static EMPTY_PAGE: [u8; PAGE_SIZE] = [0; PAGE_SIZE];
25
26/// Page-aligned buffer suitable for O_DIRECT transfers.
27pub(crate) struct AlignedPageBuf {
28    ptr: NonNull<u8>,
29    layout: Layout,
30}
31
32impl AlignedPageBuf {
33    pub(crate) fn new_zeroed() -> QuillSQLResult<Self> {
34        Self::allocate(true)
35    }
36
37    #[allow(dead_code)]
38    pub(crate) fn new_uninit() -> QuillSQLResult<Self> {
39        Self::allocate(false)
40    }
41
42    fn allocate(zeroed: bool) -> QuillSQLResult<Self> {
43        let layout = Layout::from_size_align(PAGE_SIZE, PAGE_SIZE)
44            .map_err(|_| QuillSQLError::Internal("Invalid PAGE_SIZE layout".into()))?;
45        let ptr = unsafe {
46            if zeroed {
47                alloc_zeroed(layout)
48            } else {
49                alloc(layout)
50            }
51        };
52        let Some(non_null) = NonNull::new(ptr) else {
53            return Err(QuillSQLError::Internal("Aligned allocation failed".into()));
54        };
55        Ok(Self {
56            ptr: non_null,
57            layout,
58        })
59    }
60
61    pub(crate) fn as_slice(&self) -> &[u8] {
62        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), PAGE_SIZE) }
63    }
64
65    pub(crate) fn as_mut_slice(&mut self) -> &mut [u8] {
66        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), PAGE_SIZE) }
67    }
68
69    pub(crate) fn ptr(&self) -> *mut u8 {
70        self.ptr.as_ptr()
71    }
72}
73
74impl Drop for AlignedPageBuf {
75    fn drop(&mut self) {
76        unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
77    }
78}
79
80#[derive(Debug)]
81pub struct DiskManager {
82    next_page_id: AtomicU32,
83    db_file: Mutex<File>,
84    pub meta: RwLock<MetaPage>,
85    direct_io: bool,
86}
87
88impl DiskManager {
89    fn open_raw_file(db_path: &Path, create: bool, direct: bool) -> std::io::Result<(File, bool)> {
90        let mut request_direct = direct
91            && !std::env::var("QUILL_DISABLE_DIRECT_IO")
92                .map_or(false, |v| v == "1" || v.eq_ignore_ascii_case("true"));
93
94        #[cfg(target_os = "linux")]
95        if request_direct {
96            let mut direct_opts = std::fs::OpenOptions::new();
97            direct_opts.read(true).write(true);
98            if create {
99                direct_opts.create(true);
100            }
101            direct_opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME);
102            match direct_opts.open(db_path) {
103                Ok(file) => return Ok((file, true)),
104                Err(err) => {
105                    let needs_fallback = err.kind() == ErrorKind::InvalidInput
106                        || err.raw_os_error() == Some(libc::EINVAL);
107                    if !needs_fallback {
108                        return Err(err);
109                    }
110                    debug!(
111                        "Direct I/O not available for {}: {}. Falling back to buffered mode",
112                        db_path.display(),
113                        err
114                    );
115                    request_direct = false;
116                }
117            }
118        }
119
120        let mut options = std::fs::OpenOptions::new();
121        options.read(true).write(true);
122        if create {
123            options.create(true);
124        }
125        #[cfg(target_os = "linux")]
126        if request_direct {
127            options.custom_flags(libc::O_NOATIME);
128        }
129
130        options.open(db_path).map(|file| {
131            #[cfg(target_os = "linux")]
132            {
133                (file, request_direct)
134            }
135            #[cfg(not(target_os = "linux"))]
136            {
137                let _ = request_direct;
138                (file, false)
139            }
140        })
141    }
142
143    pub fn try_new(db_path: impl AsRef<Path>) -> QuillSQLResult<Self> {
144        let mut is_new_file = false;
145        let db_path_ref = db_path.as_ref();
146        let (db_file, meta, direct_enabled) = if db_path_ref.exists() {
147            let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, false, true)?;
148            let meta_page = if direct_ok {
149                let mut aligned = AlignedPageBuf::new_zeroed()?;
150                match db_file.read_exact(aligned.as_mut_slice()) {
151                    Ok(()) => {
152                        let (meta_page, _) = decode_meta_page(aligned.as_slice())?;
153                        meta_page
154                    }
155                    Err(err) => {
156                        if err.kind() == ErrorKind::InvalidInput
157                            || err.raw_os_error() == Some(libc::EINVAL)
158                        {
159                            let (mut fallback_file, _) =
160                                Self::open_raw_file(db_path_ref, false, false)?;
161                            let mut buf = vec![0; *META_PAGE_SIZE];
162                            fallback_file.read_exact(&mut buf)?;
163                            let (meta_page, _) = decode_meta_page(&buf)?;
164                            db_file = fallback_file;
165                            direct_ok = false;
166                            meta_page
167                        } else {
168                            return Err(err.into());
169                        }
170                    }
171                }
172            } else {
173                let mut buf = vec![0; *META_PAGE_SIZE];
174                db_file.read_exact(&mut buf)?;
175                let (meta_page, _) = decode_meta_page(&buf)?;
176                meta_page
177            };
178            (db_file, meta_page, direct_ok)
179        } else {
180            is_new_file = true;
181            let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, true, true)?;
182            let meta_page = MetaPage::try_new()?;
183            let meta_bytes = encode_meta_page(&meta_page);
184
185            if direct_ok {
186                let mut aligned = AlignedPageBuf::new_zeroed()?;
187                aligned.as_mut_slice().copy_from_slice(&meta_bytes);
188
189                if let Err(err) = db_file.write_all(aligned.as_slice()) {
190                    let mut need_fallback = err.kind() == ErrorKind::InvalidInput;
191                    #[cfg(target_os = "linux")]
192                    {
193                        if !need_fallback && err.raw_os_error() == Some(libc::EINVAL) {
194                            need_fallback = true;
195                        }
196                    }
197
198                    if need_fallback {
199                        let (mut fallback_file, _) =
200                            Self::open_raw_file(db_path_ref, false, false)?;
201                        fallback_file.write_all(&meta_bytes)?;
202                        db_file = fallback_file;
203                        direct_ok = false;
204                    } else {
205                        return Err(err.into());
206                    }
207                }
208            } else {
209                db_file.write_all(&meta_bytes)?;
210            }
211
212            (db_file, meta_page, direct_ok)
213        };
214
215        // calculate next page id
216        let db_file_len = db_file.metadata()?.len();
217        if (db_file_len - *META_PAGE_SIZE as u64) % PAGE_SIZE as u64 != 0 {
218            return Err(QuillSQLError::Internal(format!(
219                "db file size not a multiple of {} + meta page size {}",
220                PAGE_SIZE, *META_PAGE_SIZE,
221            )));
222        }
223        let next_page_id =
224            (((db_file_len - *META_PAGE_SIZE as u64) / PAGE_SIZE as u64) + 1) as PageId;
225        debug!("Initialized disk_manager next_page_id: {}", next_page_id);
226
227        let disk_manager = Self {
228            next_page_id: AtomicU32::new(next_page_id),
229            // Use a mutex to wrap the file handle to ensure that only one thread
230            // can access the file at the same time among multiple threads.
231            db_file: Mutex::new(db_file),
232            meta: RwLock::new(meta),
233            direct_io: direct_enabled,
234        };
235
236        #[cfg(target_os = "linux")]
237        if !direct_enabled {
238            warn!("DiskManager running without O_DIRECT; expect OS page cache usage");
239        }
240
241        // new pages
242        if is_new_file {
243            let freelist_page_id = disk_manager.allocate_freelist_page()?;
244            let information_schema_schemas_first_page_id = disk_manager.allocate_page()?;
245            let information_schema_tables_first_page_id = disk_manager.allocate_page()?;
246            let information_schema_columns_first_page_id = disk_manager.allocate_page()?;
247            let information_schema_indexes_first_page_id = disk_manager.allocate_page()?;
248
249            let mut meta = disk_manager.meta.write().unwrap();
250            meta.freelist_page_id = freelist_page_id;
251            meta.information_schema_schemas_first_page_id =
252                information_schema_schemas_first_page_id;
253            meta.information_schema_tables_first_page_id = information_schema_tables_first_page_id;
254            meta.information_schema_columns_first_page_id =
255                information_schema_columns_first_page_id;
256            meta.information_schema_indexes_first_page_id =
257                information_schema_indexes_first_page_id;
258            drop(meta);
259            disk_manager.write_meta_page()?;
260        }
261        debug!(
262            "disk_manager meta page: {:?}",
263            disk_manager.meta.read().unwrap()
264        );
265
266        Ok(disk_manager)
267    }
268
269    pub fn read_page(&self, page_id: PageId) -> QuillSQLResult<[u8; PAGE_SIZE]> {
270        if page_id == crate::buffer::INVALID_PAGE_ID {
271            return Err(QuillSQLError::Storage(
272                "read_page: invalid page id".to_string(),
273            ));
274        }
275        let mut guard = self.db_file.lock().unwrap();
276        let mut aligned = AlignedPageBuf::new_zeroed()?;
277
278        guard.seek(SeekFrom::Start(
279            (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
280        ))?;
281        guard.read_exact(aligned.as_mut_slice())?;
282
283        let mut page = [0u8; PAGE_SIZE];
284        page.copy_from_slice(aligned.as_slice());
285        Ok(page)
286    }
287
288    pub fn write_page(&self, page_id: PageId, data: &[u8]) -> QuillSQLResult<()> {
289        if page_id == crate::buffer::INVALID_PAGE_ID {
290            return Err(QuillSQLError::Storage(
291                "write_page: invalid page id".to_string(),
292            ));
293        }
294        if data.len() != PAGE_SIZE {
295            return Err(QuillSQLError::Internal(format!(
296                "Page size is not {}",
297                PAGE_SIZE
298            )));
299        }
300        let mut guard = self.db_file.lock().unwrap();
301        self.write_page_internal(&mut guard, page_id, data)
302    }
303
304    pub fn allocate_page(&self) -> QuillSQLResult<PageId> {
305        if let Some(page_id) = self.freelist_pop()? {
306            Ok(page_id)
307        } else {
308            let mut guard = self.db_file.lock().unwrap();
309
310            // fetch current value and increment page id
311            let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
312
313            // Write an empty page (all zeros) to the allocated page.
314            self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
315
316            Ok(page_id)
317        }
318    }
319
320    pub fn allocate_freelist_page(&self) -> QuillSQLResult<PageId> {
321        let page_id = self.allocate_page()?;
322        let freelist_page = FreelistPage::new();
323        self.write_page(page_id, &FreelistPageCodec::encode(&freelist_page))?;
324        Ok(page_id)
325    }
326
327    pub fn deallocate_page(&self, page_id: PageId) -> QuillSQLResult<()> {
328        if page_id == crate::buffer::INVALID_PAGE_ID {
329            return Err(QuillSQLError::Storage(
330                "deallocate_page: invalid page id".to_string(),
331            ));
332        }
333        // Write an empty page (all zeros) to the deallocated page.
334        // But this page is not deallocated, only data will be written with null or zeros.
335        let mut guard = self.db_file.lock().unwrap();
336        self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
337        drop(guard);
338
339        self.freelist_push(page_id)?;
340        Ok(())
341    }
342
343    fn freelist_push(&self, page_id: PageId) -> QuillSQLResult<()> {
344        let mut curr_page_id = INVALID_PAGE_ID;
345        let mut next_page_id = self.meta.read().unwrap().freelist_page_id;
346        loop {
347            let mut freelist_page = if next_page_id == INVALID_PAGE_ID {
348                next_page_id = self.allocate_freelist_page()?;
349                if curr_page_id != INVALID_PAGE_ID {
350                    let (mut curr_freelist_page, _) =
351                        FreelistPageCodec::decode(&self.read_page(curr_page_id)?)?;
352                    curr_freelist_page.header.next_page_id = next_page_id;
353                    self.write_page(
354                        curr_page_id,
355                        &FreelistPageCodec::encode(&curr_freelist_page),
356                    )?;
357                }
358
359                FreelistPage::new()
360            } else {
361                let (freelist_page, _) = FreelistPageCodec::decode(&self.read_page(next_page_id)?)?;
362                freelist_page
363            };
364
365            if freelist_page.is_full() {
366                curr_page_id = next_page_id;
367                next_page_id = freelist_page.header.next_page_id;
368            } else {
369                freelist_page.push(page_id);
370                // persist page data
371                self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
372                break;
373            }
374        }
375        Ok(())
376    }
377
378    fn freelist_pop(&self) -> QuillSQLResult<Option<PageId>> {
379        let mut freelist_page_id = self.meta.read().unwrap().freelist_page_id;
380        loop {
381            if freelist_page_id != INVALID_PAGE_ID {
382                let (mut freelist_page, _) =
383                    FreelistPageCodec::decode(&self.read_page(freelist_page_id)?)?;
384                if let Some(page_id) = freelist_page.pop() {
385                    self.write_page(freelist_page_id, &FreelistPageCodec::encode(&freelist_page))?;
386                    return Ok(Some(page_id));
387                } else {
388                    freelist_page_id = freelist_page.header.next_page_id;
389                }
390            } else {
391                return Ok(None);
392            }
393        }
394    }
395
396    fn write_meta_page(&self) -> QuillSQLResult<()> {
397        let mut guard = self.db_file.lock().unwrap();
398        guard.seek(std::io::SeekFrom::Start(0))?;
399        let encoded = encode_meta_page(&self.meta.read().unwrap());
400        if self.direct_io && encoded.as_ptr() as usize % PAGE_SIZE != 0 {
401            let mut aligned = AlignedPageBuf::new_zeroed()?;
402            aligned.as_mut_slice().copy_from_slice(&encoded);
403            guard.write_all(aligned.as_slice())?;
404        } else {
405            guard.write_all(&encoded)?;
406        }
407        if !self.direct_io {
408            guard.flush()?;
409        }
410        Ok(())
411    }
412
413    fn write_page_internal(
414        &self,
415        guard: &mut MutexGuard<File>,
416        page_id: PageId,
417        data: &[u8],
418    ) -> QuillSQLResult<()> {
419        // Seek to the start of the page in the database file and write the data.
420        guard.seek(SeekFrom::Start(
421            (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
422        ))?;
423
424        if data.as_ptr() as usize % PAGE_SIZE == 0 {
425            guard.write_all(data)?;
426        } else {
427            let mut aligned = AlignedPageBuf::new_zeroed()?;
428            aligned.as_mut_slice().copy_from_slice(data);
429            guard.write_all(aligned.as_slice())?;
430        }
431        if !self.direct_io {
432            guard.flush()?;
433        }
434        Ok(())
435    }
436
437    pub fn db_file_len(&self) -> QuillSQLResult<u64> {
438        let guard = self.db_file.lock().unwrap();
439        let meta = guard.metadata()?;
440        Ok(meta.len())
441    }
442
443    #[cfg(target_os = "linux")]
444    pub fn try_clone_db_file(&self) -> QuillSQLResult<File> {
445        let guard = self.db_file.lock().unwrap();
446        let cloned = guard.try_clone()?;
447        Ok(cloned)
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use crate::buffer::PAGE_SIZE;
454    use tempfile::TempDir;
455
456    #[test]
457    pub fn test_disk_manager_write_read_page() {
458        let temp_dir = TempDir::new().unwrap();
459        let temp_path = temp_dir.path().join("test.db");
460
461        let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
462
463        let page_id1 = disk_manager.allocate_page().unwrap();
464        assert_eq!(page_id1, 6);
465        let mut page1 = vec![1, 2, 3];
466        page1.extend(vec![0; PAGE_SIZE - 3]);
467        disk_manager.write_page(page_id1, &page1).unwrap();
468        let page = disk_manager.read_page(page_id1).unwrap();
469        assert_eq!(page, page1.as_slice());
470
471        let page_id2 = disk_manager.allocate_page().unwrap();
472        assert_eq!(page_id2, 7);
473        let mut page2 = vec![0; PAGE_SIZE - 3];
474        page2.extend(vec![4, 5, 6]);
475        disk_manager.write_page(page_id2, &page2).unwrap();
476        let page = disk_manager.read_page(page_id2).unwrap();
477        assert_eq!(page, page2.as_slice());
478
479        let db_file_len = disk_manager.db_file_len().unwrap();
480        assert_eq!(db_file_len as usize, PAGE_SIZE * 7 + PAGE_SIZE);
481    }
482
483    #[test]
484    pub fn test_disk_manager_freelist() {
485        let temp_dir = TempDir::new().unwrap();
486        let temp_path = temp_dir.path().join("test.db");
487
488        let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
489
490        let page_id1 = disk_manager.allocate_page().unwrap();
491        let _page_id2 = disk_manager.allocate_page().unwrap();
492        let _page_id3 = disk_manager.allocate_page().unwrap();
493
494        disk_manager.deallocate_page(page_id1).unwrap();
495
496        let page_id4 = disk_manager.allocate_page().unwrap();
497        assert_eq!(page_id1, page_id4);
498    }
499}