sochdb_storage/
page_manager.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Page-Based File Layout with Database Header (Task 8)
16//!
17//! Implements proper database file format with:
18//! - Magic header on page 0
19//! - Schema catalog page reference
20//! - Free list management
21//! - O(1) page allocation
22//!
23//! ## File Layout
24//!
25//! ```text
26//! Page 0: DbHeader (128 bytes reserved)
27//! Page 1: Schema Catalog (root of catalog B-tree)
28//! Page 2+: Data pages (column groups, indexes)
29//! ```
30//!
31//! ## Space Amplification Target
32//!
33//! SA = file_size / logical_data_size < 1.3 (30% overhead)
34
35use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
36use parking_lot::RwLock;
37use serde::{Deserialize, Serialize};
38use std::fs::{File, OpenOptions};
39use std::io::{self, Read, Seek, SeekFrom, Write};
40use std::path::{Path, PathBuf};
41use std::sync::atomic::{AtomicU64, Ordering};
42
43/// Default page size (4KB)
44pub const DEFAULT_PAGE_SIZE: u32 = 4096;
45
46/// Magic bytes for SochDB files
47pub const SOCHDB_MAGIC: [u8; 4] = *b"TOON";
48
49/// Current format version
50pub const FORMAT_VERSION: u32 = 1;
51
52/// Page ID type
53pub type PageId = u64;
54
55/// Database header (stored on page 0)
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct DbHeader {
58    /// Magic bytes "TOON"
59    pub magic: [u8; 4],
60    /// Format version
61    pub version: u32,
62    /// Page size in bytes
63    pub page_size: u32,
64    /// Page ID of schema catalog root
65    pub schema_page: PageId,
66    /// First free page ID (head of free list)
67    pub free_list_head: PageId,
68    /// Total number of allocated pages
69    pub total_pages: u64,
70    /// Database creation timestamp (microseconds)
71    pub created_us: u64,
72    /// Last modified timestamp (microseconds)
73    pub modified_us: u64,
74    /// Header checksum
75    pub checksum: u32,
76}
77
78impl DbHeader {
79    /// Size of header in bytes
80    pub const SIZE: usize = 128;
81
82    /// Create a new database header
83    pub fn new(page_size: u32) -> Self {
84        let now = now_micros();
85        let mut header = Self {
86            magic: SOCHDB_MAGIC,
87            version: FORMAT_VERSION,
88            page_size,
89            schema_page: 1,    // Page 1 is always schema catalog
90            free_list_head: 0, // No free pages initially
91            total_pages: 2,    // Header + Schema catalog
92            created_us: now,
93            modified_us: now,
94            checksum: 0,
95        };
96        header.checksum = header.compute_checksum();
97        header
98    }
99
100    /// Compute checksum (CRC32 of all fields except checksum)
101    fn compute_checksum(&self) -> u32 {
102        let mut hasher = crc32fast::Hasher::new();
103        hasher.update(&self.magic);
104        hasher.update(&self.version.to_le_bytes());
105        hasher.update(&self.page_size.to_le_bytes());
106        hasher.update(&self.schema_page.to_le_bytes());
107        hasher.update(&self.free_list_head.to_le_bytes());
108        hasher.update(&self.total_pages.to_le_bytes());
109        hasher.update(&self.created_us.to_le_bytes());
110        hasher.update(&self.modified_us.to_le_bytes());
111        hasher.finalize()
112    }
113
114    /// Validate checksum
115    pub fn validate(&self) -> bool {
116        self.magic == SOCHDB_MAGIC && self.checksum == self.compute_checksum()
117    }
118
119    /// Serialize to bytes
120    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
121        let mut buf = [0u8; Self::SIZE];
122        let mut cursor = io::Cursor::new(&mut buf[..]);
123
124        cursor.write_all(&self.magic).unwrap();
125        cursor.write_u32::<LittleEndian>(self.version).unwrap();
126        cursor.write_u32::<LittleEndian>(self.page_size).unwrap();
127        cursor.write_u64::<LittleEndian>(self.schema_page).unwrap();
128        cursor
129            .write_u64::<LittleEndian>(self.free_list_head)
130            .unwrap();
131        cursor.write_u64::<LittleEndian>(self.total_pages).unwrap();
132        cursor.write_u64::<LittleEndian>(self.created_us).unwrap();
133        cursor.write_u64::<LittleEndian>(self.modified_us).unwrap();
134        cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
135
136        buf
137    }
138
139    /// Deserialize from bytes
140    pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
141        if buf.len() < Self::SIZE {
142            return Err(io::Error::new(
143                io::ErrorKind::InvalidData,
144                "Buffer too short for header",
145            ));
146        }
147
148        let mut cursor = io::Cursor::new(buf);
149        let mut magic = [0u8; 4];
150        cursor.read_exact(&mut magic)?;
151
152        let version = cursor.read_u32::<LittleEndian>()?;
153        let page_size = cursor.read_u32::<LittleEndian>()?;
154        let schema_page = cursor.read_u64::<LittleEndian>()?;
155        let free_list_head = cursor.read_u64::<LittleEndian>()?;
156        let total_pages = cursor.read_u64::<LittleEndian>()?;
157        let created_us = cursor.read_u64::<LittleEndian>()?;
158        let modified_us = cursor.read_u64::<LittleEndian>()?;
159        let checksum = cursor.read_u32::<LittleEndian>()?;
160
161        Ok(Self {
162            magic,
163            version,
164            page_size,
165            schema_page,
166            free_list_head,
167            total_pages,
168            created_us,
169            modified_us,
170            checksum,
171        })
172    }
173}
174
175/// Free page header (linked list node)
176#[derive(Debug, Clone)]
177pub struct FreePageHeader {
178    /// Next free page ID (0 = end of list)
179    pub next_free: PageId,
180    /// Number of contiguous free pages (for coalescing)
181    pub count: u32,
182}
183
184impl FreePageHeader {
185    /// Size in bytes
186    pub const SIZE: usize = 12;
187
188    /// Serialize to bytes
189    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
190        let mut buf = [0u8; Self::SIZE];
191        let mut cursor = io::Cursor::new(&mut buf[..]);
192        cursor.write_u64::<LittleEndian>(self.next_free).unwrap();
193        cursor.write_u32::<LittleEndian>(self.count).unwrap();
194        buf
195    }
196
197    /// Deserialize from bytes
198    pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
199        let mut cursor = io::Cursor::new(buf);
200        Ok(Self {
201            next_free: cursor.read_u64::<LittleEndian>()?,
202            count: cursor.read_u32::<LittleEndian>()?,
203        })
204    }
205}
206
207/// Page type markers
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209#[repr(u8)]
210pub enum PageType {
211    /// Database header (page 0 only)
212    Header = 0,
213    /// Schema catalog
214    Catalog = 1,
215    /// Column group data
216    ColumnGroup = 2,
217    /// Index interior node
218    IndexInterior = 3,
219    /// Index leaf node
220    IndexLeaf = 4,
221    /// Free page
222    Free = 5,
223    /// Overflow page (for large values)
224    Overflow = 6,
225}
226
227/// Statistics for page manager
228#[derive(Debug, Clone, Default)]
229pub struct PageManagerStats {
230    /// Total pages allocated
231    pub total_pages: u64,
232    /// Pages currently in use
233    pub used_pages: u64,
234    /// Free pages
235    pub free_pages: u64,
236    /// Page allocations
237    pub allocations: u64,
238    /// Page deallocations
239    pub deallocations: u64,
240    /// Page size
241    pub page_size: u32,
242    /// Total file size
243    pub file_size: u64,
244    /// Space amplification ratio
245    pub space_amplification: f64,
246}
247
248/// Page manager for database file
249pub struct PageManager {
250    /// Database file path
251    path: PathBuf,
252    /// Database file handle
253    file: RwLock<File>,
254    /// Database header
255    header: RwLock<DbHeader>,
256    /// Page size
257    page_size: u32,
258    /// Stats counters
259    allocations: AtomicU64,
260    deallocations: AtomicU64,
261}
262
263impl PageManager {
264    /// Create a new database file
265    pub fn create<P: AsRef<Path>>(path: P, page_size: u32) -> io::Result<Self> {
266        let path = path.as_ref().to_path_buf();
267
268        // Create and initialize file
269        let mut file = OpenOptions::new()
270            .create(true)
271            .read(true)
272            .write(true)
273            .truncate(true)
274            .open(&path)?;
275
276        // Write header on page 0
277        let header = DbHeader::new(page_size);
278        let header_bytes = header.to_bytes();
279        file.write_all(&header_bytes)?;
280
281        // Pad to page size
282        let padding = vec![0u8; page_size as usize - DbHeader::SIZE];
283        file.write_all(&padding)?;
284
285        // Initialize page 1 (schema catalog)
286        let catalog_page = vec![0u8; page_size as usize];
287        file.write_all(&catalog_page)?;
288
289        file.sync_all()?;
290
291        Ok(Self {
292            path,
293            file: RwLock::new(file),
294            header: RwLock::new(header),
295            page_size,
296            allocations: AtomicU64::new(0),
297            deallocations: AtomicU64::new(0),
298        })
299    }
300
301    /// Open an existing database file
302    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
303        let path = path.as_ref().to_path_buf();
304
305        let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
306
307        // Read header
308        let mut header_buf = [0u8; DbHeader::SIZE];
309        file.seek(SeekFrom::Start(0))?;
310        file.read_exact(&mut header_buf)?;
311
312        let header = DbHeader::from_bytes(&header_buf)?;
313
314        if !header.validate() {
315            return Err(io::Error::new(
316                io::ErrorKind::InvalidData,
317                "Invalid database header or checksum",
318            ));
319        }
320
321        let page_size = header.page_size;
322
323        Ok(Self {
324            path,
325            file: RwLock::new(file),
326            header: RwLock::new(header),
327            page_size,
328            allocations: AtomicU64::new(0),
329            deallocations: AtomicU64::new(0),
330        })
331    }
332
333    /// Allocate a new page - O(1) amortized
334    ///
335    /// First tries to pop from free list, otherwise extends file
336    pub fn allocate_page(&self) -> io::Result<PageId> {
337        let mut header = self.header.write();
338        let mut file = self.file.write();
339
340        self.allocations.fetch_add(1, Ordering::Relaxed);
341
342        if header.free_list_head != 0 {
343            // Pop from free list
344            let page_id = header.free_list_head;
345
346            // Read free page header to get next
347            let offset = page_id * self.page_size as u64;
348            file.seek(SeekFrom::Start(offset))?;
349
350            let mut free_header_buf = [0u8; FreePageHeader::SIZE];
351            file.read_exact(&mut free_header_buf)?;
352            let free_header = FreePageHeader::from_bytes(&free_header_buf)?;
353
354            // Update free list head
355            header.free_list_head = free_header.next_free;
356            header.modified_us = now_micros();
357            header.checksum = header.compute_checksum();
358
359            // Write updated header
360            file.seek(SeekFrom::Start(0))?;
361            file.write_all(&header.to_bytes())?;
362
363            Ok(page_id)
364        } else {
365            // Extend file
366            let page_id = header.total_pages;
367            header.total_pages += 1;
368            header.modified_us = now_micros();
369            header.checksum = header.compute_checksum();
370
371            // Write updated header
372            file.seek(SeekFrom::Start(0))?;
373            file.write_all(&header.to_bytes())?;
374
375            // Extend file with zeroed page
376            let offset = page_id * self.page_size as u64;
377            file.seek(SeekFrom::Start(offset))?;
378            let zero_page = vec![0u8; self.page_size as usize];
379            file.write_all(&zero_page)?;
380
381            Ok(page_id)
382        }
383    }
384
385    /// Deallocate a page - O(1)
386    ///
387    /// Adds page to head of free list
388    pub fn deallocate_page(&self, page_id: PageId) -> io::Result<()> {
389        if page_id < 2 {
390            return Err(io::Error::new(
391                io::ErrorKind::InvalidInput,
392                "Cannot deallocate header or catalog pages",
393            ));
394        }
395
396        let mut header = self.header.write();
397        let mut file = self.file.write();
398
399        self.deallocations.fetch_add(1, Ordering::Relaxed);
400
401        // Create free page header
402        let free_header = FreePageHeader {
403            next_free: header.free_list_head,
404            count: 1,
405        };
406
407        // Write free page header
408        let offset = page_id * self.page_size as u64;
409        file.seek(SeekFrom::Start(offset))?;
410        file.write_all(&free_header.to_bytes())?;
411
412        // Update database header
413        header.free_list_head = page_id;
414        header.modified_us = now_micros();
415        header.checksum = header.compute_checksum();
416
417        file.seek(SeekFrom::Start(0))?;
418        file.write_all(&header.to_bytes())?;
419
420        Ok(())
421    }
422
423    /// Read a page
424    pub fn read_page(&self, page_id: PageId) -> io::Result<Vec<u8>> {
425        let mut file = self.file.write();
426        let offset = page_id * self.page_size as u64;
427
428        file.seek(SeekFrom::Start(offset))?;
429        let mut buf = vec![0u8; self.page_size as usize];
430        file.read_exact(&mut buf)?;
431
432        Ok(buf)
433    }
434
435    /// Write a page
436    pub fn write_page(&self, page_id: PageId, data: &[u8]) -> io::Result<()> {
437        if data.len() != self.page_size as usize {
438            return Err(io::Error::new(
439                io::ErrorKind::InvalidInput,
440                format!("Page data must be exactly {} bytes", self.page_size),
441            ));
442        }
443
444        let mut file = self.file.write();
445        let offset = page_id * self.page_size as u64;
446
447        file.seek(SeekFrom::Start(offset))?;
448        file.write_all(data)?;
449
450        // Update modified time
451        {
452            let mut header = self.header.write();
453            header.modified_us = now_micros();
454            header.checksum = header.compute_checksum();
455            file.seek(SeekFrom::Start(0))?;
456            file.write_all(&header.to_bytes())?;
457        }
458
459        Ok(())
460    }
461
462    /// Sync all changes to disk
463    pub fn sync(&self) -> io::Result<()> {
464        self.file.read().sync_all()
465    }
466
467    /// Get page size
468    pub fn page_size(&self) -> u32 {
469        self.page_size
470    }
471
472    /// Get total pages
473    pub fn total_pages(&self) -> u64 {
474        self.header.read().total_pages
475    }
476
477    /// Get database file path
478    pub fn path(&self) -> &Path {
479        &self.path
480    }
481
482    /// Get statistics
483    pub fn stats(&self) -> io::Result<PageManagerStats> {
484        let header = self.header.read();
485        let file = self.file.read();
486        let file_size = file.metadata()?.len();
487
488        // Count free pages by walking free list
489        let mut free_count = 0u64;
490        let mut current = header.free_list_head;
491
492        drop(header);
493        drop(file);
494
495        while current != 0 {
496            free_count += 1;
497            let page_data = self.read_page(current)?;
498            let free_header = FreePageHeader::from_bytes(&page_data)?;
499            current = free_header.next_free;
500
501            // Safety: prevent infinite loops
502            if free_count > 1_000_000 {
503                break;
504            }
505        }
506
507        let header = self.header.read();
508        let used_pages = header.total_pages - free_count;
509        let logical_size = used_pages * self.page_size as u64;
510        let space_amp = if logical_size > 0 {
511            file_size as f64 / logical_size as f64
512        } else {
513            1.0
514        };
515
516        Ok(PageManagerStats {
517            total_pages: header.total_pages,
518            used_pages,
519            free_pages: free_count,
520            allocations: self.allocations.load(Ordering::Relaxed),
521            deallocations: self.deallocations.load(Ordering::Relaxed),
522            page_size: self.page_size,
523            file_size,
524            space_amplification: space_amp,
525        })
526    }
527}
528
529fn now_micros() -> u64 {
530    std::time::SystemTime::now()
531        .duration_since(std::time::UNIX_EPOCH)
532        .map(|d| d.as_micros() as u64)
533        .unwrap_or(0)
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use tempfile::tempdir;
540
541    #[test]
542    fn test_create_and_open() {
543        let dir = tempdir().unwrap();
544        let path = dir.path().join("test.sochdb");
545
546        // Create
547        {
548            let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
549            assert_eq!(pm.total_pages(), 2);
550            assert_eq!(pm.page_size(), DEFAULT_PAGE_SIZE);
551        }
552
553        // Reopen
554        {
555            let pm = PageManager::open(&path).unwrap();
556            assert_eq!(pm.total_pages(), 2);
557        }
558    }
559
560    #[test]
561    fn test_allocate_and_deallocate() {
562        let dir = tempdir().unwrap();
563        let path = dir.path().join("test.sochdb");
564
565        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
566
567        // Allocate pages
568        let p1 = pm.allocate_page().unwrap();
569        let p2 = pm.allocate_page().unwrap();
570        let p3 = pm.allocate_page().unwrap();
571
572        assert_eq!(p1, 2); // Pages 0,1 are header and catalog
573        assert_eq!(p2, 3);
574        assert_eq!(p3, 4);
575        assert_eq!(pm.total_pages(), 5);
576
577        // Deallocate p2
578        pm.deallocate_page(p2).unwrap();
579
580        // Next allocation should reuse p2
581        let p4 = pm.allocate_page().unwrap();
582        assert_eq!(p4, 3); // Reused from free list
583
584        // Check stats
585        let stats = pm.stats().unwrap();
586        assert_eq!(stats.total_pages, 5);
587        assert_eq!(stats.free_pages, 0);
588        assert!(stats.space_amplification < 1.5);
589    }
590
591    #[test]
592    fn test_read_write_page() {
593        let dir = tempdir().unwrap();
594        let path = dir.path().join("test.sochdb");
595
596        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
597        let page_id = pm.allocate_page().unwrap();
598
599        // Write data
600        let mut data = vec![0u8; DEFAULT_PAGE_SIZE as usize];
601        data[0..4].copy_from_slice(b"TEST");
602        data[100..108].copy_from_slice(&12345u64.to_le_bytes());
603        pm.write_page(page_id, &data).unwrap();
604
605        // Read back
606        let read_data = pm.read_page(page_id).unwrap();
607        assert_eq!(&read_data[0..4], b"TEST");
608
609        let value = u64::from_le_bytes(read_data[100..108].try_into().unwrap());
610        assert_eq!(value, 12345);
611    }
612
613    #[test]
614    fn test_header_validation() {
615        let header = DbHeader::new(4096);
616        assert!(header.validate());
617
618        let bytes = header.to_bytes();
619        let restored = DbHeader::from_bytes(&bytes).unwrap();
620        assert!(restored.validate());
621
622        // Tamper with data
623        let mut bad_bytes = bytes;
624        bad_bytes[10] = 0xFF;
625        let bad_header = DbHeader::from_bytes(&bad_bytes).unwrap();
626        assert!(!bad_header.validate());
627    }
628
629    #[test]
630    fn test_cannot_deallocate_system_pages() {
631        let dir = tempdir().unwrap();
632        let path = dir.path().join("test.sochdb");
633
634        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
635
636        // Cannot deallocate page 0 (header) or 1 (catalog)
637        assert!(pm.deallocate_page(0).is_err());
638        assert!(pm.deallocate_page(1).is_err());
639    }
640}