Skip to main content

sochdb_storage/
page_manager.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Page-Based File Layout with Database Header (Task 8)
19//!
20//! Implements proper database file format with:
21//! - Magic header on page 0
22//! - Schema catalog page reference
23//! - Free list management
24//! - O(1) page allocation
25//!
26//! ## File Layout
27//!
28//! ```text
29//! Page 0: DbHeader (128 bytes reserved)
30//! Page 1: Schema Catalog (root of catalog B-tree)
31//! Page 2+: Data pages (column groups, indexes)
32//! ```
33//!
34//! ## Space Amplification Target
35//!
36//! SA = file_size / logical_data_size < 1.3 (30% overhead)
37
38use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
39use parking_lot::RwLock;
40use serde::{Deserialize, Serialize};
41use std::fs::{File, OpenOptions};
42use std::io::{self, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::atomic::{AtomicU64, Ordering};
45
46/// Default page size (4KB)
47pub const DEFAULT_PAGE_SIZE: u32 = 4096;
48
49/// Magic bytes for SochDB files
50pub const SOCHDB_MAGIC: [u8; 4] = *b"TOON";
51
52/// Current format version
53pub const FORMAT_VERSION: u32 = 1;
54
55/// Page ID type
56pub type PageId = u64;
57
58/// Database header (stored on page 0)
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct DbHeader {
61    /// Magic bytes "TOON"
62    pub magic: [u8; 4],
63    /// Format version
64    pub version: u32,
65    /// Page size in bytes
66    pub page_size: u32,
67    /// Page ID of schema catalog root
68    pub schema_page: PageId,
69    /// First free page ID (head of free list)
70    pub free_list_head: PageId,
71    /// Total number of allocated pages
72    pub total_pages: u64,
73    /// Database creation timestamp (microseconds)
74    pub created_us: u64,
75    /// Last modified timestamp (microseconds)
76    pub modified_us: u64,
77    /// Header checksum
78    pub checksum: u32,
79}
80
81impl DbHeader {
82    /// Size of header in bytes
83    pub const SIZE: usize = 128;
84
85    /// Create a new database header
86    pub fn new(page_size: u32) -> Self {
87        let now = now_micros();
88        let mut header = Self {
89            magic: SOCHDB_MAGIC,
90            version: FORMAT_VERSION,
91            page_size,
92            schema_page: 1,    // Page 1 is always schema catalog
93            free_list_head: 0, // No free pages initially
94            total_pages: 2,    // Header + Schema catalog
95            created_us: now,
96            modified_us: now,
97            checksum: 0,
98        };
99        header.checksum = header.compute_checksum();
100        header
101    }
102
103    /// Compute checksum (CRC32 of all fields except checksum)
104    fn compute_checksum(&self) -> u32 {
105        let mut hasher = crc32fast::Hasher::new();
106        hasher.update(&self.magic);
107        hasher.update(&self.version.to_le_bytes());
108        hasher.update(&self.page_size.to_le_bytes());
109        hasher.update(&self.schema_page.to_le_bytes());
110        hasher.update(&self.free_list_head.to_le_bytes());
111        hasher.update(&self.total_pages.to_le_bytes());
112        hasher.update(&self.created_us.to_le_bytes());
113        hasher.update(&self.modified_us.to_le_bytes());
114        hasher.finalize()
115    }
116
117    /// Validate checksum
118    pub fn validate(&self) -> bool {
119        self.magic == SOCHDB_MAGIC && self.checksum == self.compute_checksum()
120    }
121
122    /// Serialize to bytes
123    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
124        let mut buf = [0u8; Self::SIZE];
125        let mut cursor = io::Cursor::new(&mut buf[..]);
126
127        cursor.write_all(&self.magic).unwrap();
128        cursor.write_u32::<LittleEndian>(self.version).unwrap();
129        cursor.write_u32::<LittleEndian>(self.page_size).unwrap();
130        cursor.write_u64::<LittleEndian>(self.schema_page).unwrap();
131        cursor
132            .write_u64::<LittleEndian>(self.free_list_head)
133            .unwrap();
134        cursor.write_u64::<LittleEndian>(self.total_pages).unwrap();
135        cursor.write_u64::<LittleEndian>(self.created_us).unwrap();
136        cursor.write_u64::<LittleEndian>(self.modified_us).unwrap();
137        cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
138
139        buf
140    }
141
142    /// Deserialize from bytes
143    pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
144        if buf.len() < Self::SIZE {
145            return Err(io::Error::new(
146                io::ErrorKind::InvalidData,
147                "Buffer too short for header",
148            ));
149        }
150
151        let mut cursor = io::Cursor::new(buf);
152        let mut magic = [0u8; 4];
153        cursor.read_exact(&mut magic)?;
154
155        let version = cursor.read_u32::<LittleEndian>()?;
156        let page_size = cursor.read_u32::<LittleEndian>()?;
157        let schema_page = cursor.read_u64::<LittleEndian>()?;
158        let free_list_head = cursor.read_u64::<LittleEndian>()?;
159        let total_pages = cursor.read_u64::<LittleEndian>()?;
160        let created_us = cursor.read_u64::<LittleEndian>()?;
161        let modified_us = cursor.read_u64::<LittleEndian>()?;
162        let checksum = cursor.read_u32::<LittleEndian>()?;
163
164        Ok(Self {
165            magic,
166            version,
167            page_size,
168            schema_page,
169            free_list_head,
170            total_pages,
171            created_us,
172            modified_us,
173            checksum,
174        })
175    }
176}
177
178/// Free page header (linked list node)
179#[derive(Debug, Clone)]
180pub struct FreePageHeader {
181    /// Next free page ID (0 = end of list)
182    pub next_free: PageId,
183    /// Number of contiguous free pages (for coalescing)
184    pub count: u32,
185}
186
187impl FreePageHeader {
188    /// Size in bytes
189    pub const SIZE: usize = 12;
190
191    /// Serialize to bytes
192    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
193        let mut buf = [0u8; Self::SIZE];
194        let mut cursor = io::Cursor::new(&mut buf[..]);
195        cursor.write_u64::<LittleEndian>(self.next_free).unwrap();
196        cursor.write_u32::<LittleEndian>(self.count).unwrap();
197        buf
198    }
199
200    /// Deserialize from bytes
201    pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
202        let mut cursor = io::Cursor::new(buf);
203        Ok(Self {
204            next_free: cursor.read_u64::<LittleEndian>()?,
205            count: cursor.read_u32::<LittleEndian>()?,
206        })
207    }
208}
209
210/// Page type markers
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[repr(u8)]
213pub enum PageType {
214    /// Database header (page 0 only)
215    Header = 0,
216    /// Schema catalog
217    Catalog = 1,
218    /// Column group data
219    ColumnGroup = 2,
220    /// Index interior node
221    IndexInterior = 3,
222    /// Index leaf node
223    IndexLeaf = 4,
224    /// Free page
225    Free = 5,
226    /// Overflow page (for large values)
227    Overflow = 6,
228}
229
230/// Statistics for page manager
231#[derive(Debug, Clone, Default)]
232pub struct PageManagerStats {
233    /// Total pages allocated
234    pub total_pages: u64,
235    /// Pages currently in use
236    pub used_pages: u64,
237    /// Free pages
238    pub free_pages: u64,
239    /// Page allocations
240    pub allocations: u64,
241    /// Page deallocations
242    pub deallocations: u64,
243    /// Page size
244    pub page_size: u32,
245    /// Total file size
246    pub file_size: u64,
247    /// Space amplification ratio
248    pub space_amplification: f64,
249}
250
251/// Page manager for database file
252pub struct PageManager {
253    /// Database file path
254    path: PathBuf,
255    /// Database file handle
256    file: RwLock<File>,
257    /// Database header
258    header: RwLock<DbHeader>,
259    /// Page size
260    page_size: u32,
261    /// Stats counters
262    allocations: AtomicU64,
263    deallocations: AtomicU64,
264}
265
266impl PageManager {
267    /// Create a new database file
268    pub fn create<P: AsRef<Path>>(path: P, page_size: u32) -> io::Result<Self> {
269        let path = path.as_ref().to_path_buf();
270
271        // Create and initialize file
272        let mut file = OpenOptions::new()
273            .create(true)
274            .read(true)
275            .write(true)
276            .truncate(true)
277            .open(&path)?;
278
279        // Write header on page 0
280        let header = DbHeader::new(page_size);
281        let header_bytes = header.to_bytes();
282        file.write_all(&header_bytes)?;
283
284        // Pad to page size
285        let padding = vec![0u8; page_size as usize - DbHeader::SIZE];
286        file.write_all(&padding)?;
287
288        // Initialize page 1 (schema catalog)
289        let catalog_page = vec![0u8; page_size as usize];
290        file.write_all(&catalog_page)?;
291
292        file.sync_all()?;
293
294        Ok(Self {
295            path,
296            file: RwLock::new(file),
297            header: RwLock::new(header),
298            page_size,
299            allocations: AtomicU64::new(0),
300            deallocations: AtomicU64::new(0),
301        })
302    }
303
304    /// Open an existing database file
305    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
306        let path = path.as_ref().to_path_buf();
307
308        let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
309
310        // Read header
311        let mut header_buf = [0u8; DbHeader::SIZE];
312        file.seek(SeekFrom::Start(0))?;
313        file.read_exact(&mut header_buf)?;
314
315        let header = DbHeader::from_bytes(&header_buf)?;
316
317        if !header.validate() {
318            return Err(io::Error::new(
319                io::ErrorKind::InvalidData,
320                "Invalid database header or checksum",
321            ));
322        }
323
324        let page_size = header.page_size;
325
326        Ok(Self {
327            path,
328            file: RwLock::new(file),
329            header: RwLock::new(header),
330            page_size,
331            allocations: AtomicU64::new(0),
332            deallocations: AtomicU64::new(0),
333        })
334    }
335
336    /// Allocate a new page - O(1) amortized
337    ///
338    /// First tries to pop from free list, otherwise extends file
339    pub fn allocate_page(&self) -> io::Result<PageId> {
340        let mut header = self.header.write();
341        let mut file = self.file.write();
342
343        self.allocations.fetch_add(1, Ordering::Relaxed);
344
345        if header.free_list_head != 0 {
346            // Pop from free list
347            let page_id = header.free_list_head;
348
349            // Read free page header to get next
350            let offset = page_id * self.page_size as u64;
351            file.seek(SeekFrom::Start(offset))?;
352
353            let mut free_header_buf = [0u8; FreePageHeader::SIZE];
354            file.read_exact(&mut free_header_buf)?;
355            let free_header = FreePageHeader::from_bytes(&free_header_buf)?;
356
357            // Update free list head
358            header.free_list_head = free_header.next_free;
359            header.modified_us = now_micros();
360            header.checksum = header.compute_checksum();
361
362            // Write updated header
363            file.seek(SeekFrom::Start(0))?;
364            file.write_all(&header.to_bytes())?;
365
366            Ok(page_id)
367        } else {
368            // Extend file
369            let page_id = header.total_pages;
370            header.total_pages += 1;
371            header.modified_us = now_micros();
372            header.checksum = header.compute_checksum();
373
374            // Write updated header
375            file.seek(SeekFrom::Start(0))?;
376            file.write_all(&header.to_bytes())?;
377
378            // Extend file with zeroed page
379            let offset = page_id * self.page_size as u64;
380            file.seek(SeekFrom::Start(offset))?;
381            let zero_page = vec![0u8; self.page_size as usize];
382            file.write_all(&zero_page)?;
383
384            Ok(page_id)
385        }
386    }
387
388    /// Deallocate a page - O(1)
389    ///
390    /// Adds page to head of free list
391    pub fn deallocate_page(&self, page_id: PageId) -> io::Result<()> {
392        if page_id < 2 {
393            return Err(io::Error::new(
394                io::ErrorKind::InvalidInput,
395                "Cannot deallocate header or catalog pages",
396            ));
397        }
398
399        let mut header = self.header.write();
400        let mut file = self.file.write();
401
402        self.deallocations.fetch_add(1, Ordering::Relaxed);
403
404        // Create free page header
405        let free_header = FreePageHeader {
406            next_free: header.free_list_head,
407            count: 1,
408        };
409
410        // Write free page header
411        let offset = page_id * self.page_size as u64;
412        file.seek(SeekFrom::Start(offset))?;
413        file.write_all(&free_header.to_bytes())?;
414
415        // Update database header
416        header.free_list_head = page_id;
417        header.modified_us = now_micros();
418        header.checksum = header.compute_checksum();
419
420        file.seek(SeekFrom::Start(0))?;
421        file.write_all(&header.to_bytes())?;
422
423        Ok(())
424    }
425
426    /// Read a page
427    pub fn read_page(&self, page_id: PageId) -> io::Result<Vec<u8>> {
428        let mut file = self.file.write();
429        let offset = page_id * self.page_size as u64;
430
431        file.seek(SeekFrom::Start(offset))?;
432        let mut buf = vec![0u8; self.page_size as usize];
433        file.read_exact(&mut buf)?;
434
435        Ok(buf)
436    }
437
438    /// Write a page
439    pub fn write_page(&self, page_id: PageId, data: &[u8]) -> io::Result<()> {
440        if data.len() != self.page_size as usize {
441            return Err(io::Error::new(
442                io::ErrorKind::InvalidInput,
443                format!("Page data must be exactly {} bytes", self.page_size),
444            ));
445        }
446
447        let mut file = self.file.write();
448        let offset = page_id * self.page_size as u64;
449
450        file.seek(SeekFrom::Start(offset))?;
451        file.write_all(data)?;
452
453        // Update modified time
454        {
455            let mut header = self.header.write();
456            header.modified_us = now_micros();
457            header.checksum = header.compute_checksum();
458            file.seek(SeekFrom::Start(0))?;
459            file.write_all(&header.to_bytes())?;
460        }
461
462        Ok(())
463    }
464
465    /// Sync all changes to disk
466    pub fn sync(&self) -> io::Result<()> {
467        self.file.read().sync_all()
468    }
469
470    /// Get page size
471    pub fn page_size(&self) -> u32 {
472        self.page_size
473    }
474
475    /// Get total pages
476    pub fn total_pages(&self) -> u64 {
477        self.header.read().total_pages
478    }
479
480    /// Get database file path
481    pub fn path(&self) -> &Path {
482        &self.path
483    }
484
485    /// Get statistics
486    pub fn stats(&self) -> io::Result<PageManagerStats> {
487        let header = self.header.read();
488        let file = self.file.read();
489        let file_size = file.metadata()?.len();
490
491        // Count free pages by walking free list
492        let mut free_count = 0u64;
493        let mut current = header.free_list_head;
494
495        drop(header);
496        drop(file);
497
498        while current != 0 {
499            free_count += 1;
500            let page_data = self.read_page(current)?;
501            let free_header = FreePageHeader::from_bytes(&page_data)?;
502            current = free_header.next_free;
503
504            // Safety: prevent infinite loops
505            if free_count > 1_000_000 {
506                break;
507            }
508        }
509
510        let header = self.header.read();
511        let used_pages = header.total_pages - free_count;
512        let logical_size = used_pages * self.page_size as u64;
513        let space_amp = if logical_size > 0 {
514            file_size as f64 / logical_size as f64
515        } else {
516            1.0
517        };
518
519        Ok(PageManagerStats {
520            total_pages: header.total_pages,
521            used_pages,
522            free_pages: free_count,
523            allocations: self.allocations.load(Ordering::Relaxed),
524            deallocations: self.deallocations.load(Ordering::Relaxed),
525            page_size: self.page_size,
526            file_size,
527            space_amplification: space_amp,
528        })
529    }
530}
531
532fn now_micros() -> u64 {
533    std::time::SystemTime::now()
534        .duration_since(std::time::UNIX_EPOCH)
535        .map(|d| d.as_micros() as u64)
536        .unwrap_or(0)
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use tempfile::tempdir;
543
544    #[test]
545    fn test_create_and_open() {
546        let dir = tempdir().unwrap();
547        let path = dir.path().join("test.sochdb");
548
549        // Create
550        {
551            let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
552            assert_eq!(pm.total_pages(), 2);
553            assert_eq!(pm.page_size(), DEFAULT_PAGE_SIZE);
554        }
555
556        // Reopen
557        {
558            let pm = PageManager::open(&path).unwrap();
559            assert_eq!(pm.total_pages(), 2);
560        }
561    }
562
563    #[test]
564    fn test_allocate_and_deallocate() {
565        let dir = tempdir().unwrap();
566        let path = dir.path().join("test.sochdb");
567
568        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
569
570        // Allocate pages
571        let p1 = pm.allocate_page().unwrap();
572        let p2 = pm.allocate_page().unwrap();
573        let p3 = pm.allocate_page().unwrap();
574
575        assert_eq!(p1, 2); // Pages 0,1 are header and catalog
576        assert_eq!(p2, 3);
577        assert_eq!(p3, 4);
578        assert_eq!(pm.total_pages(), 5);
579
580        // Deallocate p2
581        pm.deallocate_page(p2).unwrap();
582
583        // Next allocation should reuse p2
584        let p4 = pm.allocate_page().unwrap();
585        assert_eq!(p4, 3); // Reused from free list
586
587        // Check stats
588        let stats = pm.stats().unwrap();
589        assert_eq!(stats.total_pages, 5);
590        assert_eq!(stats.free_pages, 0);
591        assert!(stats.space_amplification < 1.5);
592    }
593
594    #[test]
595    fn test_read_write_page() {
596        let dir = tempdir().unwrap();
597        let path = dir.path().join("test.sochdb");
598
599        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
600        let page_id = pm.allocate_page().unwrap();
601
602        // Write data
603        let mut data = vec![0u8; DEFAULT_PAGE_SIZE as usize];
604        data[0..4].copy_from_slice(b"TEST");
605        data[100..108].copy_from_slice(&12345u64.to_le_bytes());
606        pm.write_page(page_id, &data).unwrap();
607
608        // Read back
609        let read_data = pm.read_page(page_id).unwrap();
610        assert_eq!(&read_data[0..4], b"TEST");
611
612        let value = u64::from_le_bytes(read_data[100..108].try_into().unwrap());
613        assert_eq!(value, 12345);
614    }
615
616    #[test]
617    fn test_header_validation() {
618        let header = DbHeader::new(4096);
619        assert!(header.validate());
620
621        let bytes = header.to_bytes();
622        let restored = DbHeader::from_bytes(&bytes).unwrap();
623        assert!(restored.validate());
624
625        // Tamper with data
626        let mut bad_bytes = bytes;
627        bad_bytes[10] = 0xFF;
628        let bad_header = DbHeader::from_bytes(&bad_bytes).unwrap();
629        assert!(!bad_header.validate());
630    }
631
632    #[test]
633    fn test_cannot_deallocate_system_pages() {
634        let dir = tempdir().unwrap();
635        let path = dir.path().join("test.sochdb");
636
637        let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
638
639        // Cannot deallocate page 0 (header) or 1 (catalog)
640        assert!(pm.deallocate_page(0).is_err());
641        assert!(pm.deallocate_page(1).is_err());
642    }
643}