1use log::{debug, warn};
2use std::alloc::{alloc, alloc_zeroed, dealloc, Layout};
3use std::fs::File;
4use std::path::Path;
5use std::ptr::NonNull;
6use std::slice;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::RwLock;
9use std::{
10 io::{ErrorKind, Read, Seek, SeekFrom, Write},
11 sync::{Mutex, MutexGuard},
12};
13
14#[cfg(target_os = "linux")]
15use std::os::unix::fs::OpenOptionsExt;
16
17use crate::error::{QuillSQLError, QuillSQLResult};
18
19use crate::buffer::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
20use crate::storage::codec::FreelistPageCodec;
21use crate::storage::page::FreelistPage;
22use crate::storage::page::{decode_meta_page, encode_meta_page, MetaPage, META_PAGE_SIZE};
23
24static EMPTY_PAGE: [u8; PAGE_SIZE] = [0; PAGE_SIZE];
25
26pub(crate) struct AlignedPageBuf {
28 ptr: NonNull<u8>,
29 layout: Layout,
30}
31
32impl AlignedPageBuf {
33 pub(crate) fn new_zeroed() -> QuillSQLResult<Self> {
34 Self::allocate(true)
35 }
36
37 #[allow(dead_code)]
38 pub(crate) fn new_uninit() -> QuillSQLResult<Self> {
39 Self::allocate(false)
40 }
41
42 fn allocate(zeroed: bool) -> QuillSQLResult<Self> {
43 let layout = Layout::from_size_align(PAGE_SIZE, PAGE_SIZE)
44 .map_err(|_| QuillSQLError::Internal("Invalid PAGE_SIZE layout".into()))?;
45 let ptr = unsafe {
46 if zeroed {
47 alloc_zeroed(layout)
48 } else {
49 alloc(layout)
50 }
51 };
52 let Some(non_null) = NonNull::new(ptr) else {
53 return Err(QuillSQLError::Internal("Aligned allocation failed".into()));
54 };
55 Ok(Self {
56 ptr: non_null,
57 layout,
58 })
59 }
60
61 pub(crate) fn as_slice(&self) -> &[u8] {
62 unsafe { slice::from_raw_parts(self.ptr.as_ptr(), PAGE_SIZE) }
63 }
64
65 pub(crate) fn as_mut_slice(&mut self) -> &mut [u8] {
66 unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), PAGE_SIZE) }
67 }
68
69 pub(crate) fn ptr(&self) -> *mut u8 {
70 self.ptr.as_ptr()
71 }
72}
73
74impl Drop for AlignedPageBuf {
75 fn drop(&mut self) {
76 unsafe { dealloc(self.ptr.as_ptr(), self.layout) };
77 }
78}
79
80#[derive(Debug)]
81pub struct DiskManager {
82 next_page_id: AtomicU32,
83 db_file: Mutex<File>,
84 pub meta: RwLock<MetaPage>,
85 direct_io: bool,
86}
87
88impl DiskManager {
89 fn open_raw_file(db_path: &Path, create: bool, direct: bool) -> std::io::Result<(File, bool)> {
90 let mut request_direct = direct
91 && !std::env::var("QUILL_DISABLE_DIRECT_IO")
92 .map_or(false, |v| v == "1" || v.eq_ignore_ascii_case("true"));
93
94 #[cfg(target_os = "linux")]
95 if request_direct {
96 let mut direct_opts = std::fs::OpenOptions::new();
97 direct_opts.read(true).write(true);
98 if create {
99 direct_opts.create(true);
100 }
101 direct_opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME);
102 match direct_opts.open(db_path) {
103 Ok(file) => return Ok((file, true)),
104 Err(err) => {
105 let needs_fallback = err.kind() == ErrorKind::InvalidInput
106 || err.raw_os_error() == Some(libc::EINVAL);
107 if !needs_fallback {
108 return Err(err);
109 }
110 debug!(
111 "Direct I/O not available for {}: {}. Falling back to buffered mode",
112 db_path.display(),
113 err
114 );
115 request_direct = false;
116 }
117 }
118 }
119
120 let mut options = std::fs::OpenOptions::new();
121 options.read(true).write(true);
122 if create {
123 options.create(true);
124 }
125 #[cfg(target_os = "linux")]
126 if request_direct {
127 options.custom_flags(libc::O_NOATIME);
128 }
129
130 options.open(db_path).map(|file| {
131 #[cfg(target_os = "linux")]
132 {
133 (file, request_direct)
134 }
135 #[cfg(not(target_os = "linux"))]
136 {
137 let _ = request_direct;
138 (file, false)
139 }
140 })
141 }
142
143 pub fn try_new(db_path: impl AsRef<Path>) -> QuillSQLResult<Self> {
144 let mut is_new_file = false;
145 let db_path_ref = db_path.as_ref();
146 let (db_file, meta, direct_enabled) = if db_path_ref.exists() {
147 let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, false, true)?;
148 let meta_page = if direct_ok {
149 let mut aligned = AlignedPageBuf::new_zeroed()?;
150 match db_file.read_exact(aligned.as_mut_slice()) {
151 Ok(()) => {
152 let (meta_page, _) = decode_meta_page(aligned.as_slice())?;
153 meta_page
154 }
155 Err(err) => {
156 if err.kind() == ErrorKind::InvalidInput
157 || err.raw_os_error() == Some(libc::EINVAL)
158 {
159 let (mut fallback_file, _) =
160 Self::open_raw_file(db_path_ref, false, false)?;
161 let mut buf = vec![0; *META_PAGE_SIZE];
162 fallback_file.read_exact(&mut buf)?;
163 let (meta_page, _) = decode_meta_page(&buf)?;
164 db_file = fallback_file;
165 direct_ok = false;
166 meta_page
167 } else {
168 return Err(err.into());
169 }
170 }
171 }
172 } else {
173 let mut buf = vec![0; *META_PAGE_SIZE];
174 db_file.read_exact(&mut buf)?;
175 let (meta_page, _) = decode_meta_page(&buf)?;
176 meta_page
177 };
178 (db_file, meta_page, direct_ok)
179 } else {
180 is_new_file = true;
181 let (mut db_file, mut direct_ok) = Self::open_raw_file(db_path_ref, true, true)?;
182 let meta_page = MetaPage::try_new()?;
183 let meta_bytes = encode_meta_page(&meta_page);
184
185 if direct_ok {
186 let mut aligned = AlignedPageBuf::new_zeroed()?;
187 aligned.as_mut_slice().copy_from_slice(&meta_bytes);
188
189 if let Err(err) = db_file.write_all(aligned.as_slice()) {
190 let mut need_fallback = err.kind() == ErrorKind::InvalidInput;
191 #[cfg(target_os = "linux")]
192 {
193 if !need_fallback && err.raw_os_error() == Some(libc::EINVAL) {
194 need_fallback = true;
195 }
196 }
197
198 if need_fallback {
199 let (mut fallback_file, _) =
200 Self::open_raw_file(db_path_ref, false, false)?;
201 fallback_file.write_all(&meta_bytes)?;
202 db_file = fallback_file;
203 direct_ok = false;
204 } else {
205 return Err(err.into());
206 }
207 }
208 } else {
209 db_file.write_all(&meta_bytes)?;
210 }
211
212 (db_file, meta_page, direct_ok)
213 };
214
215 let db_file_len = db_file.metadata()?.len();
217 if (db_file_len - *META_PAGE_SIZE as u64) % PAGE_SIZE as u64 != 0 {
218 return Err(QuillSQLError::Internal(format!(
219 "db file size not a multiple of {} + meta page size {}",
220 PAGE_SIZE, *META_PAGE_SIZE,
221 )));
222 }
223 let next_page_id =
224 (((db_file_len - *META_PAGE_SIZE as u64) / PAGE_SIZE as u64) + 1) as PageId;
225 debug!("Initialized disk_manager next_page_id: {}", next_page_id);
226
227 let disk_manager = Self {
228 next_page_id: AtomicU32::new(next_page_id),
229 db_file: Mutex::new(db_file),
232 meta: RwLock::new(meta),
233 direct_io: direct_enabled,
234 };
235
236 #[cfg(target_os = "linux")]
237 if !direct_enabled {
238 warn!("DiskManager running without O_DIRECT; expect OS page cache usage");
239 }
240
241 if is_new_file {
243 let freelist_page_id = disk_manager.allocate_freelist_page()?;
244 let information_schema_schemas_first_page_id = disk_manager.allocate_page()?;
245 let information_schema_tables_first_page_id = disk_manager.allocate_page()?;
246 let information_schema_columns_first_page_id = disk_manager.allocate_page()?;
247 let information_schema_indexes_first_page_id = disk_manager.allocate_page()?;
248
249 let mut meta = disk_manager.meta.write().unwrap();
250 meta.freelist_page_id = freelist_page_id;
251 meta.information_schema_schemas_first_page_id =
252 information_schema_schemas_first_page_id;
253 meta.information_schema_tables_first_page_id = information_schema_tables_first_page_id;
254 meta.information_schema_columns_first_page_id =
255 information_schema_columns_first_page_id;
256 meta.information_schema_indexes_first_page_id =
257 information_schema_indexes_first_page_id;
258 drop(meta);
259 disk_manager.write_meta_page()?;
260 }
261 debug!(
262 "disk_manager meta page: {:?}",
263 disk_manager.meta.read().unwrap()
264 );
265
266 Ok(disk_manager)
267 }
268
269 pub fn read_page(&self, page_id: PageId) -> QuillSQLResult<[u8; PAGE_SIZE]> {
270 if page_id == crate::buffer::INVALID_PAGE_ID {
271 return Err(QuillSQLError::Storage(
272 "read_page: invalid page id".to_string(),
273 ));
274 }
275 let mut guard = self.db_file.lock().unwrap();
276 let mut aligned = AlignedPageBuf::new_zeroed()?;
277
278 guard.seek(SeekFrom::Start(
279 (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
280 ))?;
281 guard.read_exact(aligned.as_mut_slice())?;
282
283 let mut page = [0u8; PAGE_SIZE];
284 page.copy_from_slice(aligned.as_slice());
285 Ok(page)
286 }
287
288 pub fn write_page(&self, page_id: PageId, data: &[u8]) -> QuillSQLResult<()> {
289 if page_id == crate::buffer::INVALID_PAGE_ID {
290 return Err(QuillSQLError::Storage(
291 "write_page: invalid page id".to_string(),
292 ));
293 }
294 if data.len() != PAGE_SIZE {
295 return Err(QuillSQLError::Internal(format!(
296 "Page size is not {}",
297 PAGE_SIZE
298 )));
299 }
300 let mut guard = self.db_file.lock().unwrap();
301 self.write_page_internal(&mut guard, page_id, data)
302 }
303
304 pub fn allocate_page(&self) -> QuillSQLResult<PageId> {
305 if let Some(page_id) = self.freelist_pop()? {
306 Ok(page_id)
307 } else {
308 let mut guard = self.db_file.lock().unwrap();
309
310 let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
312
313 self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
315
316 Ok(page_id)
317 }
318 }
319
320 pub fn allocate_freelist_page(&self) -> QuillSQLResult<PageId> {
321 let page_id = self.allocate_page()?;
322 let freelist_page = FreelistPage::new();
323 self.write_page(page_id, &FreelistPageCodec::encode(&freelist_page))?;
324 Ok(page_id)
325 }
326
327 pub fn deallocate_page(&self, page_id: PageId) -> QuillSQLResult<()> {
328 if page_id == crate::buffer::INVALID_PAGE_ID {
329 return Err(QuillSQLError::Storage(
330 "deallocate_page: invalid page id".to_string(),
331 ));
332 }
333 let mut guard = self.db_file.lock().unwrap();
336 self.write_page_internal(&mut guard, page_id, &EMPTY_PAGE)?;
337 drop(guard);
338
339 self.freelist_push(page_id)?;
340 Ok(())
341 }
342
343 fn freelist_push(&self, page_id: PageId) -> QuillSQLResult<()> {
344 let mut curr_page_id = INVALID_PAGE_ID;
345 let mut next_page_id = self.meta.read().unwrap().freelist_page_id;
346 loop {
347 let mut freelist_page = if next_page_id == INVALID_PAGE_ID {
348 next_page_id = self.allocate_freelist_page()?;
349 if curr_page_id != INVALID_PAGE_ID {
350 let (mut curr_freelist_page, _) =
351 FreelistPageCodec::decode(&self.read_page(curr_page_id)?)?;
352 curr_freelist_page.header.next_page_id = next_page_id;
353 self.write_page(
354 curr_page_id,
355 &FreelistPageCodec::encode(&curr_freelist_page),
356 )?;
357 }
358
359 FreelistPage::new()
360 } else {
361 let (freelist_page, _) = FreelistPageCodec::decode(&self.read_page(next_page_id)?)?;
362 freelist_page
363 };
364
365 if freelist_page.is_full() {
366 curr_page_id = next_page_id;
367 next_page_id = freelist_page.header.next_page_id;
368 } else {
369 freelist_page.push(page_id);
370 self.write_page(next_page_id, &FreelistPageCodec::encode(&freelist_page))?;
372 break;
373 }
374 }
375 Ok(())
376 }
377
378 fn freelist_pop(&self) -> QuillSQLResult<Option<PageId>> {
379 let mut freelist_page_id = self.meta.read().unwrap().freelist_page_id;
380 loop {
381 if freelist_page_id != INVALID_PAGE_ID {
382 let (mut freelist_page, _) =
383 FreelistPageCodec::decode(&self.read_page(freelist_page_id)?)?;
384 if let Some(page_id) = freelist_page.pop() {
385 self.write_page(freelist_page_id, &FreelistPageCodec::encode(&freelist_page))?;
386 return Ok(Some(page_id));
387 } else {
388 freelist_page_id = freelist_page.header.next_page_id;
389 }
390 } else {
391 return Ok(None);
392 }
393 }
394 }
395
396 fn write_meta_page(&self) -> QuillSQLResult<()> {
397 let mut guard = self.db_file.lock().unwrap();
398 guard.seek(std::io::SeekFrom::Start(0))?;
399 let encoded = encode_meta_page(&self.meta.read().unwrap());
400 if self.direct_io && encoded.as_ptr() as usize % PAGE_SIZE != 0 {
401 let mut aligned = AlignedPageBuf::new_zeroed()?;
402 aligned.as_mut_slice().copy_from_slice(&encoded);
403 guard.write_all(aligned.as_slice())?;
404 } else {
405 guard.write_all(&encoded)?;
406 }
407 if !self.direct_io {
408 guard.flush()?;
409 }
410 Ok(())
411 }
412
413 fn write_page_internal(
414 &self,
415 guard: &mut MutexGuard<File>,
416 page_id: PageId,
417 data: &[u8],
418 ) -> QuillSQLResult<()> {
419 guard.seek(SeekFrom::Start(
421 (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64,
422 ))?;
423
424 if data.as_ptr() as usize % PAGE_SIZE == 0 {
425 guard.write_all(data)?;
426 } else {
427 let mut aligned = AlignedPageBuf::new_zeroed()?;
428 aligned.as_mut_slice().copy_from_slice(data);
429 guard.write_all(aligned.as_slice())?;
430 }
431 if !self.direct_io {
432 guard.flush()?;
433 }
434 Ok(())
435 }
436
437 pub fn db_file_len(&self) -> QuillSQLResult<u64> {
438 let guard = self.db_file.lock().unwrap();
439 let meta = guard.metadata()?;
440 Ok(meta.len())
441 }
442
443 #[cfg(target_os = "linux")]
444 pub fn try_clone_db_file(&self) -> QuillSQLResult<File> {
445 let guard = self.db_file.lock().unwrap();
446 let cloned = guard.try_clone()?;
447 Ok(cloned)
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use crate::buffer::PAGE_SIZE;
454 use tempfile::TempDir;
455
456 #[test]
457 pub fn test_disk_manager_write_read_page() {
458 let temp_dir = TempDir::new().unwrap();
459 let temp_path = temp_dir.path().join("test.db");
460
461 let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
462
463 let page_id1 = disk_manager.allocate_page().unwrap();
464 assert_eq!(page_id1, 6);
465 let mut page1 = vec![1, 2, 3];
466 page1.extend(vec![0; PAGE_SIZE - 3]);
467 disk_manager.write_page(page_id1, &page1).unwrap();
468 let page = disk_manager.read_page(page_id1).unwrap();
469 assert_eq!(page, page1.as_slice());
470
471 let page_id2 = disk_manager.allocate_page().unwrap();
472 assert_eq!(page_id2, 7);
473 let mut page2 = vec![0; PAGE_SIZE - 3];
474 page2.extend(vec![4, 5, 6]);
475 disk_manager.write_page(page_id2, &page2).unwrap();
476 let page = disk_manager.read_page(page_id2).unwrap();
477 assert_eq!(page, page2.as_slice());
478
479 let db_file_len = disk_manager.db_file_len().unwrap();
480 assert_eq!(db_file_len as usize, PAGE_SIZE * 7 + PAGE_SIZE);
481 }
482
483 #[test]
484 pub fn test_disk_manager_freelist() {
485 let temp_dir = TempDir::new().unwrap();
486 let temp_path = temp_dir.path().join("test.db");
487
488 let disk_manager = super::DiskManager::try_new(temp_path).unwrap();
489
490 let page_id1 = disk_manager.allocate_page().unwrap();
491 let _page_id2 = disk_manager.allocate_page().unwrap();
492 let _page_id3 = disk_manager.allocate_page().unwrap();
493
494 disk_manager.deallocate_page(page_id1).unwrap();
495
496 let page_id4 = disk_manager.allocate_page().unwrap();
497 assert_eq!(page_id1, page_id4);
498 }
499}