1use log::{debug, warn};
2use std::alloc::{alloc, alloc_zeroed, dealloc, Layout};
3use std::fs::File;
4use std::path::Path;
5use std::ptr::NonNull;
6use std::slice;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::RwLock;
9use std::{
10 io::{ErrorKind, Read, Seek, SeekFrom, Write},
11 sync::{Mutex, MutexGuard},
12};
13
14#[cfg(target_os = "linux")]
15use std::os::unix::fs::OpenOptionsExt;
16
17use crate::error::{QuillSQLError, QuillSQLResult};
18
19use crate::buffer::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
20use crate::storage::codec::FreelistPageCodec;
21use crate::storage::page::FreelistPage;
22use crate::storage::page::{decode_meta_page, encode_meta_page, MetaPage, META_PAGE_SIZE};
23
24static EMPTY_PAGE: [u8; PAGE_SIZE] = [0; PAGE_SIZE];
25
26pub(crate) struct AlignedPageBuf {
28 ptr: NonNull<u8>,
29 layout: Layout,
30}
31
32impl AlignedPageBuf {
33 pub(crate) fn new_zeroed() -> QuillSQLResult<Self> {
34 Self::allocate(true)
35 }
36
37 pub(crate) fn new_uninit() -> QuillSQLResult<Self> {
38 Self::allocate(false)
39 }
40
41 fn allocate(zeroed: bool) -> QuillSQLResult<Self> {
42 let layout = Layout::from_size_align(PAGE_SIZE, PAGE_SIZE)
43 .map_err(|_| QuillSQLError::Internal("Invalid PAGE_SIZE layout".into()))?;
44 let ptr = unsafe {
45 if zeroed {
46 alloc_zeroed(layout)
47 } else {
48 alloc(layout)
49 }
50 };
51 let Some(non_null) = NonNull::new(ptr) else {
52 return Err(QuillSQLError::Internal("Aligned allocation failed".into()));
53 };
54 Ok(Self {
55 ptr: non_null,
56 layout,
57 })
58 }
59
60 pub(crate) fn as_slice(&self) -> &[u8] {
61 unsafe { slice::from_raw_parts(self.ptr.as_ptr(), PAGE_SIZE) }
62 }
63
64 pub(crate) fn as_mut_slice(&mut self) -> &mut [u8] {
65 unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), PAGE_SIZE) }
66 }
67
68 pub(crate) fn ptr(&self) -> *mut u8 {
69 self.ptr.as_ptr()
70 }
71}
72
73impl Drop for AlignedPageBuf {
74 fn drop(&mut self) {
75 unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
76 }
77}
78
79#[derive(Debug)]
80pub struct DiskManager {
81 next_page_id: AtomicU32,
82 db_file: Mutex<File>,
83 pub meta: RwLock<MetaPage>,
84 direct_io: bool,
85}
86
87impl DiskManager {
88 fn open_raw_file(db_path: &Path, create: bool, direct: bool) -> std::io::Result<(File, bool)> {
89 let mut request_direct = direct
90 && !std::env::var("QUILL_DISABLE_DIRECT_IO")
91 .map_or(false, |v| v == "1" || v.eq_ignore_ascii_case("true"));
92
93 #[cfg(target_os = "linux")]
94 if request_direct {
95 let mut direct_opts = std::fs::OpenOptions::new();
96 direct_opts.read(true).write(true);
97 if create {
98 direct_opts.create(true);
99 }
100 direct_opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME);
101 match direct_opts.open(db_path) {
102 Ok(file) => return Ok((file, true)),
103 Err(err) => {
104 let needs_fallback = err.kind() == ErrorKind::InvalidInput
105 || err.raw_os_error() == Some(libc::EINVAL);
106 if !needs_fallback {
107 return Err(err);
108 }
109 debug!(
110 "Direct I/O not available for {}: {}. Falling back to buffered mode",
111 db_path.display(),
112 err
113 );
114 request_direct = false;
115 }
116 }
117 }
118
119 let mut options = std::fs::OpenOptions::new();
120 options.read(true).write(true);
121 if create {
122 options.create(true);
123 }
124 #[cfg(target_os = "linux")]
125 if request_direct {
126 options.custom_flags(libc::O_NOATIME);
127 }
128
129 options.open(db_path).map(|file| {
130 #[cfg(target_os = "linux")]
131 {
132 (file, request_direct)
133 }
134 #[cfg(not(target_os = "linux"))]
135 {
136 let _ = request_direct;
137 (file, false)
138 }
139 })
140 }
141
142 pub fn try_new(db_path: impl AsRef<Path>) -> QuillSQLResult<Self> {
143 let mut is_new_file = false;
144 let db_path_ref = db_path.as_ref();
145 let (db_file, meta, direct_enabled) = if db_path_ref.exists() {
146 let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, false, true)?;
147 let meta_page = if direct_ok {
148 let mut aligned = AlignedPageBuf::new_zeroed()?;
149 match db_file.read_exact(aligned.as_mut_slice()) {
150 Ok(()) => {
151 let (meta_page, _) = decode_meta_page(aligned.as_slice())?;
152 meta_page
153 }
154 Err(err) => {
155 if err.kind() == ErrorKind::InvalidInput
156 || err.raw_os_error() == Some(libc::EINVAL)
157 {
158 let (mut fallback_file, _) =
159 Self::open_raw_file(db_path_ref, false, false)?;
160 let mut buf = vec![0; *META_PAGE_SIZE];
161 fallback_file.read_exact(&mut buf)?;
162 let (meta_page, _) = decode_meta_page(&buf)?;
163 db_file = fallback_file;
164 direct_ok = false;
165 meta_page
166 } else {
167 return Err(err.into());
168 }
169 }
170 }
171 } else {
172 let mut buf = vec![0; *META_PAGE_SIZE];
173 db_file.read_exact(&mut buf)?;
174 let (meta_page, _) = decode_meta_page(&buf)?;
175 meta_page
176 };
177 (db_file, meta_page, direct_ok)
178 } else {
179 is_new_file = true;
180 let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, true, true)?;
181 let meta_page = MetaPage::try_new()?;
182 let meta_bytes = encode_meta_page(&meta_page);
183
184 if direct_ok {
185 let mut aligned = AlignedPageBuf::new_zeroed()?;
186 aligned.as_mut_slice().copy_from_slice(&meta_bytes);
187
188 if let Err(err) = db_file.write_all(aligned.as_slice()) {
189 let mut need_fallback = err.kind() == ErrorKind::InvalidInput;
190 #[cfg(target_os = "linux")]
191 {
192 if !need_fallback && err.raw_os_error() == Some(libc::EINVAL) {
193 need_fallback = true;
194 }
195 }
196
197 if need_fallback {
198 let (mut fallback_file, _) =
199 Self::open_raw_file(db_path_ref, false, false)?;
200 fallback_file.write_all(&meta_bytes)?;
201 db_file = fallback_file;
202 direct_ok = false;
203 } else {
204 return Err(err.into());
205 }
206 }
207 } else {
208 db_file.write_all(&meta_bytes)?;
209 }
210
211 (db_file, meta_page, direct_ok)
212 };
213
214 let db_file_len = db_file.metadata()?.len();
216 if (db_file_len - *META_PAGE_SIZE as u64) % PAGE_SIZE as u64 != 0 {
217 return Err(QuillSQLError::Internal(format!(
218 "db file size not a multiple of {} + meta page size {}",
219 PAGE_SIZE, *META_PAGE_SIZE,
220 )));
221 }
222 let next_page_id =
223 (((db_file_len - *META_PAGE_SIZE as u64) / PAGE_SIZE as u64) + 1) as PageId;
224 debug!("Initialized disk_manager next_page_id: {}", next_page_id);
225
226 let disk_manager = Self {
227 next_page_id: AtomicU32::new(next_page_id),
228 db_file: Mutex::new(db_file),
231 meta: RwLock::new(meta),
232 direct_io: direct_enabled,
233 };
234
235 #[cfg(target_os = "linux")]
236 if !direct_enabled {
237 warn!("DiskManager running without O_DIRECT; expect OS page cache usage");
238 }
239
240 if is_new_file {
242 let freelist_page_id = disk_manager.allocate_freelist_page()?;
243 let information_schema_schemas_first_page_id = disk_manager.allocate_page()?;
244 let information_schema_tables_first_page_id = disk_manager.allocate_page()?;
245 let information_schema_columns_first_page_id = disk_manager.allocate_page()?;
246 let information_schema_indexes_first_page_id = disk_manager.allocate_page()?;
247
248 let mut meta = disk_manager.meta.write().unwrap();
249 meta.freelist_page_id = freelist_page_id;
250 meta.information_schema_schemas_first_page_id =
251 information_schema_schemas_first_page_id;
252 meta.information_schema_tables_first_page_id = information_schema_tables_first_page_id;
253 meta.information_schema_columns_first_page_id =
254 information_schema_columns_first_page_id;
255 meta.information_schema_indexes_first_page_id =
256 information_schema_indexes_first_page_id;
257 drop(meta);
258 disk_manager.write_meta_page()?;
259 }
260 debug!(
261 "disk_manager meta page: {:?}",
262 disk_manager.meta.read().unwrap()
263 );
264
265 Ok(disk_manager)
266 }
267
268 pub fn read_page(&self, page_id: PageId) -> QuillSQLResult<[u8; PAGE_SIZE]> {
269 if page_id == crate::buffer::INVALID_PAGE_ID {
270 return Err(QuillSQLError::Storage(
271 "read_page: invalid page id".to_string(),
272 ));
273 }
274 let mut guard = self.db_file.lock().unwrap();
275 let mut aligned = AlignedPageBuf::new_zeroed()?;
276
277 guard.seek(SeekFrom::Start(
278 (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
279 ))?;
280 guard.read_exact(aligned.as_mut_slice())?;
281
282 let mut page = [0u8; PAGE_SIZE];
283 page.copy_from_slice(aligned.as_slice());
284 Ok(page)
285 }
286
287 pub fn write_page(&self, page_id: PageId, data: &[u8]) -> QuillSQLResult<()> {
288 if page_id == crate::buffer::INVALID_PAGE_ID {
289 return Err(QuillSQLError::Storage(
290 "write_page: invalid page id".to_string(),
291 ));
292 }
293 if data.len() != PAGE_SIZE {
294 return Err(QuillSQLError::Internal(format!(
295 "Page size is not {}",
296 PAGE_SIZE
297 )));
298 }
299 let mut guard = self.db_file.lock().unwrap();
300 self.write_page_internal(&mut guard, page_id, data)
301 }
302
303 pub fn allocate_page(&self) -> QuillSQLResult<PageId> {
304 if let Some(page_id) = self.freelist_pop()? {
305 Ok(page_id)
306 } else {
307 let mut guard = self.db_file.lock().unwrap();
308
309 let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
311
312 self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
314
315 Ok(page_id)
316 }
317 }
318
319 pub fn allocate_freelist_page(&self) -> QuillSQLResult<PageId> {
320 let page_id = self.allocate_page()?;
321 let freelist_page = FreelistPage::new();
322 self.write_page(page_id, &FreelistPageCodec::encode(&freelist_page))?;
323 Ok(page_id)
324 }
325
326 pub fn deallocate_page(&self, page_id: PageId) -> QuillSQLResult<()> {
327 if page_id == crate::buffer::INVALID_PAGE_ID {
328 return Err(QuillSQLError::Storage(
329 "deallocate_page: invalid page id".to_string(),
330 ));
331 }
332 let mut guard = self.db_file.lock().unwrap();
335 self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
336 drop(guard);
337
338 self.freelist_push(page_id)?;
339 Ok(())
340 }
341
342 fn freelist_push(&self, page_id: PageId) -> QuillSQLResult<()> {
343 let mut curr_page_id = INVALID_PAGE_ID;
344 let mut next_page_id = self.meta.read().unwrap().freelist_page_id;
345 loop {
346 let mut freelist_page = if next_page_id == INVALID_PAGE_ID {
347 next_page_id = self.allocate_freelist_page()?;
348 if curr_page_id != INVALID_PAGE_ID {
349 let (mut curr_freelist_page, _) =
350 FreelistPageCodec::decode(&self.read_page(curr_page_id)?)?;
351 curr_freelist_page.header.next_page_id = next_page_id;
352 self.write_page(
353 curr_page_id,
354 &FreelistPageCodec::encode(&curr_freelist_page),
355 )?;
356 }
357
358 FreelistPage::new()
359 } else {
360 let (freelist_page, _) = FreelistPageCodec::decode(&self.read_page(next_page_id)?)?;
361 freelist_page
362 };
363
364 if freelist_page.is_full() {
365 curr_page_id = next_page_id;
366 next_page_id = freelist_page.header.next_page_id;
367 } else {
368 freelist_page.push(page_id);
369 self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
371 break;
372 }
373 }
374 Ok(())
375 }
376
377 fn freelist_pop(&self) -> QuillSQLResult<Option<PageId>> {
378 let mut freelist_page_id = self.meta.read().unwrap().freelist_page_id;
379 loop {
380 if freelist_page_id != INVALID_PAGE_ID {
381 let (mut freelist_page, _) =
382 FreelistPageCodec::decode(&self.read_page(freelist_page_id)?)?;
383 if let Some(page_id) = freelist_page.pop() {
384 self.write_page(freelist_page_id, &FreelistPageCodec::encode(&freelist_page))?;
385 return Ok(Some(page_id));
386 } else {
387 freelist_page_id = freelist_page.header.next_page_id;
388 }
389 } else {
390 return Ok(None);
391 }
392 }
393 }
394
395 fn write_meta_page(&self) -> QuillSQLResult<()> {
396 let mut guard = self.db_file.lock().unwrap();
397 guard.seek(std::io::SeekFrom::Start(0))?;
398 let encoded = encode_meta_page(&self.meta.read().unwrap());
399 if self.direct_io && encoded.as_ptr() as usize % PAGE_SIZE != 0 {
400 let mut aligned = AlignedPageBuf::new_zeroed()?;
401 aligned.as_mut_slice().copy_from_slice(&encoded);
402 guard.write_all(aligned.as_slice())?;
403 } else {
404 guard.write_all(&encoded)?;
405 }
406 if !self.direct_io {
407 guard.flush()?;
408 }
409 Ok(())
410 }
411
412 fn write_page_internal(
413 &self,
414 guard: &mut MutexGuard<File>,
415 page_id: PageId,
416 data: &[u8],
417 ) -> QuillSQLResult<()> {
418 guard.seek(SeekFrom::Start(
420 (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
421 ))?;
422
423 if data.as_ptr() as usize % PAGE_SIZE == 0 {
424 guard.write_all(data)?;
425 } else {
426 let mut aligned = AlignedPageBuf::new_zeroed()?;
427 aligned.as_mut_slice().copy_from_slice(data);
428 guard.write_all(aligned.as_slice())?;
429 }
430 if !self.direct_io {
431 guard.flush()?;
432 }
433 Ok(())
434 }
435
436 pub fn db_file_len(&self) -> QuillSQLResult<u64> {
437 let guard = self.db_file.lock().unwrap();
438 let meta = guard.metadata()?;
439 Ok(meta.len())
440 }
441
442 #[cfg(target_os = "linux")]
443 pub fn try_clone_db_file(&self) -> QuillSQLResult<File> {
444 let guard = self.db_file.lock().unwrap();
445 let cloned = guard.try_clone()?;
446 Ok(cloned)
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use crate::buffer::PAGE_SIZE;
453 use tempfile::TempDir;
454
455 #[test]
456 pub fn test_disk_manager_write_read_page() {
457 let temp_dir = TempDir::new().unwrap();
458 let temp_path = temp_dir.path().join("test.db");
459
460 let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
461
462 let page_id1 = disk_manager.allocate_page().unwrap();
463 assert_eq!(page_id1, 6);
464 let mut page1 = vec![1, 2, 3];
465 page1.extend(vec![0; PAGE_SIZE - 3]);
466 disk_manager.write_page(page_id1, &page1).unwrap();
467 let page = disk_manager.read_page(page_id1).unwrap();
468 assert_eq!(page, page1.as_slice());
469
470 let page_id2 = disk_manager.allocate_page().unwrap();
471 assert_eq!(page_id2, 7);
472 let mut page2 = vec![0; PAGE_SIZE - 3];
473 page2.extend(vec![4, 5, 6]);
474 disk_manager.write_page(page_id2, &page2).unwrap();
475 let page = disk_manager.read_page(page_id2).unwrap();
476 assert_eq!(page, page2.as_slice());
477
478 let db_file_len = disk_manager.db_file_len().unwrap();
479 assert_eq!(db_file_len as usize, PAGE_SIZE * 7 + PAGE_SIZE);
480 }
481
482 #[test]
483 pub fn test_disk_manager_freelist() {
484 let temp_dir = TempDir::new().unwrap();
485 let temp_path = temp_dir.path().join("test.db");
486
487 let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
488
489 let page_id1 = disk_manager.allocate_page().unwrap();
490 let _page_id2 = disk_manager.allocate_page().unwrap();
491 let _page_id3 = disk_manager.allocate_page().unwrap();
492
493 disk_manager.deallocate_page(page_id1).unwrap();
494
495 let page_id4 = disk_manager.allocate_page().unwrap();
496 assert_eq!(page_id1, page_id4);
497 }
498}