1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
36use parking_lot::RwLock;
37use serde::{Deserialize, Serialize};
38use std::fs::{File, OpenOptions};
39use std::io::{self, Read, Seek, SeekFrom, Write};
40use std::path::{Path, PathBuf};
41use std::sync::atomic::{AtomicU64, Ordering};
42
43pub const DEFAULT_PAGE_SIZE: u32 = 4096;
45
46pub const SOCHDB_MAGIC: [u8; 4] = *b"TOON";
48
49pub const FORMAT_VERSION: u32 = 1;
51
52pub type PageId = u64;
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct DbHeader {
58 pub magic: [u8; 4],
60 pub version: u32,
62 pub page_size: u32,
64 pub schema_page: PageId,
66 pub free_list_head: PageId,
68 pub total_pages: u64,
70 pub created_us: u64,
72 pub modified_us: u64,
74 pub checksum: u32,
76}
77
78impl DbHeader {
79 pub const SIZE: usize = 128;
81
82 pub fn new(page_size: u32) -> Self {
84 let now = now_micros();
85 let mut header = Self {
86 magic: SOCHDB_MAGIC,
87 version: FORMAT_VERSION,
88 page_size,
89 schema_page: 1, free_list_head: 0, total_pages: 2, created_us: now,
93 modified_us: now,
94 checksum: 0,
95 };
96 header.checksum = header.compute_checksum();
97 header
98 }
99
100 fn compute_checksum(&self) -> u32 {
102 let mut hasher = crc32fast::Hasher::new();
103 hasher.update(&self.magic);
104 hasher.update(&self.version.to_le_bytes());
105 hasher.update(&self.page_size.to_le_bytes());
106 hasher.update(&self.schema_page.to_le_bytes());
107 hasher.update(&self.free_list_head.to_le_bytes());
108 hasher.update(&self.total_pages.to_le_bytes());
109 hasher.update(&self.created_us.to_le_bytes());
110 hasher.update(&self.modified_us.to_le_bytes());
111 hasher.finalize()
112 }
113
114 pub fn validate(&self) -> bool {
116 self.magic == SOCHDB_MAGIC && self.checksum == self.compute_checksum()
117 }
118
119 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
121 let mut buf = [0u8; Self::SIZE];
122 let mut cursor = io::Cursor::new(&mut buf[..]);
123
124 cursor.write_all(&self.magic).unwrap();
125 cursor.write_u32::<LittleEndian>(self.version).unwrap();
126 cursor.write_u32::<LittleEndian>(self.page_size).unwrap();
127 cursor.write_u64::<LittleEndian>(self.schema_page).unwrap();
128 cursor
129 .write_u64::<LittleEndian>(self.free_list_head)
130 .unwrap();
131 cursor.write_u64::<LittleEndian>(self.total_pages).unwrap();
132 cursor.write_u64::<LittleEndian>(self.created_us).unwrap();
133 cursor.write_u64::<LittleEndian>(self.modified_us).unwrap();
134 cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
135
136 buf
137 }
138
139 pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
141 if buf.len() < Self::SIZE {
142 return Err(io::Error::new(
143 io::ErrorKind::InvalidData,
144 "Buffer too short for header",
145 ));
146 }
147
148 let mut cursor = io::Cursor::new(buf);
149 let mut magic = [0u8; 4];
150 cursor.read_exact(&mut magic)?;
151
152 let version = cursor.read_u32::<LittleEndian>()?;
153 let page_size = cursor.read_u32::<LittleEndian>()?;
154 let schema_page = cursor.read_u64::<LittleEndian>()?;
155 let free_list_head = cursor.read_u64::<LittleEndian>()?;
156 let total_pages = cursor.read_u64::<LittleEndian>()?;
157 let created_us = cursor.read_u64::<LittleEndian>()?;
158 let modified_us = cursor.read_u64::<LittleEndian>()?;
159 let checksum = cursor.read_u32::<LittleEndian>()?;
160
161 Ok(Self {
162 magic,
163 version,
164 page_size,
165 schema_page,
166 free_list_head,
167 total_pages,
168 created_us,
169 modified_us,
170 checksum,
171 })
172 }
173}
174
175#[derive(Debug, Clone)]
177pub struct FreePageHeader {
178 pub next_free: PageId,
180 pub count: u32,
182}
183
184impl FreePageHeader {
185 pub const SIZE: usize = 12;
187
188 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
190 let mut buf = [0u8; Self::SIZE];
191 let mut cursor = io::Cursor::new(&mut buf[..]);
192 cursor.write_u64::<LittleEndian>(self.next_free).unwrap();
193 cursor.write_u32::<LittleEndian>(self.count).unwrap();
194 buf
195 }
196
197 pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
199 let mut cursor = io::Cursor::new(buf);
200 Ok(Self {
201 next_free: cursor.read_u64::<LittleEndian>()?,
202 count: cursor.read_u32::<LittleEndian>()?,
203 })
204 }
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209#[repr(u8)]
210pub enum PageType {
211 Header = 0,
213 Catalog = 1,
215 ColumnGroup = 2,
217 IndexInterior = 3,
219 IndexLeaf = 4,
221 Free = 5,
223 Overflow = 6,
225}
226
227#[derive(Debug, Clone, Default)]
229pub struct PageManagerStats {
230 pub total_pages: u64,
232 pub used_pages: u64,
234 pub free_pages: u64,
236 pub allocations: u64,
238 pub deallocations: u64,
240 pub page_size: u32,
242 pub file_size: u64,
244 pub space_amplification: f64,
246}
247
248pub struct PageManager {
250 path: PathBuf,
252 file: RwLock<File>,
254 header: RwLock<DbHeader>,
256 page_size: u32,
258 allocations: AtomicU64,
260 deallocations: AtomicU64,
261}
262
263impl PageManager {
264 pub fn create<P: AsRef<Path>>(path: P, page_size: u32) -> io::Result<Self> {
266 let path = path.as_ref().to_path_buf();
267
268 let mut file = OpenOptions::new()
270 .create(true)
271 .read(true)
272 .write(true)
273 .truncate(true)
274 .open(&path)?;
275
276 let header = DbHeader::new(page_size);
278 let header_bytes = header.to_bytes();
279 file.write_all(&header_bytes)?;
280
281 let padding = vec![0u8; page_size as usize - DbHeader::SIZE];
283 file.write_all(&padding)?;
284
285 let catalog_page = vec![0u8; page_size as usize];
287 file.write_all(&catalog_page)?;
288
289 file.sync_all()?;
290
291 Ok(Self {
292 path,
293 file: RwLock::new(file),
294 header: RwLock::new(header),
295 page_size,
296 allocations: AtomicU64::new(0),
297 deallocations: AtomicU64::new(0),
298 })
299 }
300
301 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
303 let path = path.as_ref().to_path_buf();
304
305 let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
306
307 let mut header_buf = [0u8; DbHeader::SIZE];
309 file.seek(SeekFrom::Start(0))?;
310 file.read_exact(&mut header_buf)?;
311
312 let header = DbHeader::from_bytes(&header_buf)?;
313
314 if !header.validate() {
315 return Err(io::Error::new(
316 io::ErrorKind::InvalidData,
317 "Invalid database header or checksum",
318 ));
319 }
320
321 let page_size = header.page_size;
322
323 Ok(Self {
324 path,
325 file: RwLock::new(file),
326 header: RwLock::new(header),
327 page_size,
328 allocations: AtomicU64::new(0),
329 deallocations: AtomicU64::new(0),
330 })
331 }
332
333 pub fn allocate_page(&self) -> io::Result<PageId> {
337 let mut header = self.header.write();
338 let mut file = self.file.write();
339
340 self.allocations.fetch_add(1, Ordering::Relaxed);
341
342 if header.free_list_head != 0 {
343 let page_id = header.free_list_head;
345
346 let offset = page_id * self.page_size as u64;
348 file.seek(SeekFrom::Start(offset))?;
349
350 let mut free_header_buf = [0u8; FreePageHeader::SIZE];
351 file.read_exact(&mut free_header_buf)?;
352 let free_header = FreePageHeader::from_bytes(&free_header_buf)?;
353
354 header.free_list_head = free_header.next_free;
356 header.modified_us = now_micros();
357 header.checksum = header.compute_checksum();
358
359 file.seek(SeekFrom::Start(0))?;
361 file.write_all(&header.to_bytes())?;
362
363 Ok(page_id)
364 } else {
365 let page_id = header.total_pages;
367 header.total_pages += 1;
368 header.modified_us = now_micros();
369 header.checksum = header.compute_checksum();
370
371 file.seek(SeekFrom::Start(0))?;
373 file.write_all(&header.to_bytes())?;
374
375 let offset = page_id * self.page_size as u64;
377 file.seek(SeekFrom::Start(offset))?;
378 let zero_page = vec![0u8; self.page_size as usize];
379 file.write_all(&zero_page)?;
380
381 Ok(page_id)
382 }
383 }
384
385 pub fn deallocate_page(&self, page_id: PageId) -> io::Result<()> {
389 if page_id < 2 {
390 return Err(io::Error::new(
391 io::ErrorKind::InvalidInput,
392 "Cannot deallocate header or catalog pages",
393 ));
394 }
395
396 let mut header = self.header.write();
397 let mut file = self.file.write();
398
399 self.deallocations.fetch_add(1, Ordering::Relaxed);
400
401 let free_header = FreePageHeader {
403 next_free: header.free_list_head,
404 count: 1,
405 };
406
407 let offset = page_id * self.page_size as u64;
409 file.seek(SeekFrom::Start(offset))?;
410 file.write_all(&free_header.to_bytes())?;
411
412 header.free_list_head = page_id;
414 header.modified_us = now_micros();
415 header.checksum = header.compute_checksum();
416
417 file.seek(SeekFrom::Start(0))?;
418 file.write_all(&header.to_bytes())?;
419
420 Ok(())
421 }
422
423 pub fn read_page(&self, page_id: PageId) -> io::Result<Vec<u8>> {
425 let mut file = self.file.write();
426 let offset = page_id * self.page_size as u64;
427
428 file.seek(SeekFrom::Start(offset))?;
429 let mut buf = vec![0u8; self.page_size as usize];
430 file.read_exact(&mut buf)?;
431
432 Ok(buf)
433 }
434
435 pub fn write_page(&self, page_id: PageId, data: &[u8]) -> io::Result<()> {
437 if data.len() != self.page_size as usize {
438 return Err(io::Error::new(
439 io::ErrorKind::InvalidInput,
440 format!("Page data must be exactly {} bytes", self.page_size),
441 ));
442 }
443
444 let mut file = self.file.write();
445 let offset = page_id * self.page_size as u64;
446
447 file.seek(SeekFrom::Start(offset))?;
448 file.write_all(data)?;
449
450 {
452 let mut header = self.header.write();
453 header.modified_us = now_micros();
454 header.checksum = header.compute_checksum();
455 file.seek(SeekFrom::Start(0))?;
456 file.write_all(&header.to_bytes())?;
457 }
458
459 Ok(())
460 }
461
462 pub fn sync(&self) -> io::Result<()> {
464 self.file.read().sync_all()
465 }
466
467 pub fn page_size(&self) -> u32 {
469 self.page_size
470 }
471
472 pub fn total_pages(&self) -> u64 {
474 self.header.read().total_pages
475 }
476
477 pub fn path(&self) -> &Path {
479 &self.path
480 }
481
482 pub fn stats(&self) -> io::Result<PageManagerStats> {
484 let header = self.header.read();
485 let file = self.file.read();
486 let file_size = file.metadata()?.len();
487
488 let mut free_count = 0u64;
490 let mut current = header.free_list_head;
491
492 drop(header);
493 drop(file);
494
495 while current != 0 {
496 free_count += 1;
497 let page_data = self.read_page(current)?;
498 let free_header = FreePageHeader::from_bytes(&page_data)?;
499 current = free_header.next_free;
500
501 if free_count > 1_000_000 {
503 break;
504 }
505 }
506
507 let header = self.header.read();
508 let used_pages = header.total_pages - free_count;
509 let logical_size = used_pages * self.page_size as u64;
510 let space_amp = if logical_size > 0 {
511 file_size as f64 / logical_size as f64
512 } else {
513 1.0
514 };
515
516 Ok(PageManagerStats {
517 total_pages: header.total_pages,
518 used_pages,
519 free_pages: free_count,
520 allocations: self.allocations.load(Ordering::Relaxed),
521 deallocations: self.deallocations.load(Ordering::Relaxed),
522 page_size: self.page_size,
523 file_size,
524 space_amplification: space_amp,
525 })
526 }
527}
528
529fn now_micros() -> u64 {
530 std::time::SystemTime::now()
531 .duration_since(std::time::UNIX_EPOCH)
532 .map(|d| d.as_micros() as u64)
533 .unwrap_or(0)
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use tempfile::tempdir;
540
541 #[test]
542 fn test_create_and_open() {
543 let dir = tempdir().unwrap();
544 let path = dir.path().join("test.sochdb");
545
546 {
548 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
549 assert_eq!(pm.total_pages(), 2);
550 assert_eq!(pm.page_size(), DEFAULT_PAGE_SIZE);
551 }
552
553 {
555 let pm = PageManager::open(&path).unwrap();
556 assert_eq!(pm.total_pages(), 2);
557 }
558 }
559
560 #[test]
561 fn test_allocate_and_deallocate() {
562 let dir = tempdir().unwrap();
563 let path = dir.path().join("test.sochdb");
564
565 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
566
567 let p1 = pm.allocate_page().unwrap();
569 let p2 = pm.allocate_page().unwrap();
570 let p3 = pm.allocate_page().unwrap();
571
572 assert_eq!(p1, 2); assert_eq!(p2, 3);
574 assert_eq!(p3, 4);
575 assert_eq!(pm.total_pages(), 5);
576
577 pm.deallocate_page(p2).unwrap();
579
580 let p4 = pm.allocate_page().unwrap();
582 assert_eq!(p4, 3); let stats = pm.stats().unwrap();
586 assert_eq!(stats.total_pages, 5);
587 assert_eq!(stats.free_pages, 0);
588 assert!(stats.space_amplification < 1.5);
589 }
590
591 #[test]
592 fn test_read_write_page() {
593 let dir = tempdir().unwrap();
594 let path = dir.path().join("test.sochdb");
595
596 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
597 let page_id = pm.allocate_page().unwrap();
598
599 let mut data = vec![0u8; DEFAULT_PAGE_SIZE as usize];
601 data[0..4].copy_from_slice(b"TEST");
602 data[100..108].copy_from_slice(&12345u64.to_le_bytes());
603 pm.write_page(page_id, &data).unwrap();
604
605 let read_data = pm.read_page(page_id).unwrap();
607 assert_eq!(&read_data[0..4], b"TEST");
608
609 let value = u64::from_le_bytes(read_data[100..108].try_into().unwrap());
610 assert_eq!(value, 12345);
611 }
612
613 #[test]
614 fn test_header_validation() {
615 let header = DbHeader::new(4096);
616 assert!(header.validate());
617
618 let bytes = header.to_bytes();
619 let restored = DbHeader::from_bytes(&bytes).unwrap();
620 assert!(restored.validate());
621
622 let mut bad_bytes = bytes;
624 bad_bytes[10] = 0xFF;
625 let bad_header = DbHeader::from_bytes(&bad_bytes).unwrap();
626 assert!(!bad_header.validate());
627 }
628
629 #[test]
630 fn test_cannot_deallocate_system_pages() {
631 let dir = tempdir().unwrap();
632 let path = dir.path().join("test.sochdb");
633
634 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
635
636 assert!(pm.deallocate_page(0).is_err());
638 assert!(pm.deallocate_page(1).is_err());
639 }
640}