1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
39use parking_lot::RwLock;
40use serde::{Deserialize, Serialize};
41use std::fs::{File, OpenOptions};
42use std::io::{self, Read, Seek, SeekFrom, Write};
43use std::path::{Path, PathBuf};
44use std::sync::atomic::{AtomicU64, Ordering};
45
46pub const DEFAULT_PAGE_SIZE: u32 = 4096;
48
49pub const SOCHDB_MAGIC: [u8; 4] = *b"TOON";
51
52pub const FORMAT_VERSION: u32 = 1;
54
55pub type PageId = u64;
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct DbHeader {
61 pub magic: [u8; 4],
63 pub version: u32,
65 pub page_size: u32,
67 pub schema_page: PageId,
69 pub free_list_head: PageId,
71 pub total_pages: u64,
73 pub created_us: u64,
75 pub modified_us: u64,
77 pub checksum: u32,
79}
80
81impl DbHeader {
82 pub const SIZE: usize = 128;
84
85 pub fn new(page_size: u32) -> Self {
87 let now = now_micros();
88 let mut header = Self {
89 magic: SOCHDB_MAGIC,
90 version: FORMAT_VERSION,
91 page_size,
92 schema_page: 1, free_list_head: 0, total_pages: 2, created_us: now,
96 modified_us: now,
97 checksum: 0,
98 };
99 header.checksum = header.compute_checksum();
100 header
101 }
102
103 fn compute_checksum(&self) -> u32 {
105 let mut hasher = crc32fast::Hasher::new();
106 hasher.update(&self.magic);
107 hasher.update(&self.version.to_le_bytes());
108 hasher.update(&self.page_size.to_le_bytes());
109 hasher.update(&self.schema_page.to_le_bytes());
110 hasher.update(&self.free_list_head.to_le_bytes());
111 hasher.update(&self.total_pages.to_le_bytes());
112 hasher.update(&self.created_us.to_le_bytes());
113 hasher.update(&self.modified_us.to_le_bytes());
114 hasher.finalize()
115 }
116
117 pub fn validate(&self) -> bool {
119 self.magic == SOCHDB_MAGIC && self.checksum == self.compute_checksum()
120 }
121
122 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
124 let mut buf = [0u8; Self::SIZE];
125 let mut cursor = io::Cursor::new(&mut buf[..]);
126
127 cursor.write_all(&self.magic).unwrap();
128 cursor.write_u32::<LittleEndian>(self.version).unwrap();
129 cursor.write_u32::<LittleEndian>(self.page_size).unwrap();
130 cursor.write_u64::<LittleEndian>(self.schema_page).unwrap();
131 cursor
132 .write_u64::<LittleEndian>(self.free_list_head)
133 .unwrap();
134 cursor.write_u64::<LittleEndian>(self.total_pages).unwrap();
135 cursor.write_u64::<LittleEndian>(self.created_us).unwrap();
136 cursor.write_u64::<LittleEndian>(self.modified_us).unwrap();
137 cursor.write_u32::<LittleEndian>(self.checksum).unwrap();
138
139 buf
140 }
141
142 pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
144 if buf.len() < Self::SIZE {
145 return Err(io::Error::new(
146 io::ErrorKind::InvalidData,
147 "Buffer too short for header",
148 ));
149 }
150
151 let mut cursor = io::Cursor::new(buf);
152 let mut magic = [0u8; 4];
153 cursor.read_exact(&mut magic)?;
154
155 let version = cursor.read_u32::<LittleEndian>()?;
156 let page_size = cursor.read_u32::<LittleEndian>()?;
157 let schema_page = cursor.read_u64::<LittleEndian>()?;
158 let free_list_head = cursor.read_u64::<LittleEndian>()?;
159 let total_pages = cursor.read_u64::<LittleEndian>()?;
160 let created_us = cursor.read_u64::<LittleEndian>()?;
161 let modified_us = cursor.read_u64::<LittleEndian>()?;
162 let checksum = cursor.read_u32::<LittleEndian>()?;
163
164 Ok(Self {
165 magic,
166 version,
167 page_size,
168 schema_page,
169 free_list_head,
170 total_pages,
171 created_us,
172 modified_us,
173 checksum,
174 })
175 }
176}
177
178#[derive(Debug, Clone)]
180pub struct FreePageHeader {
181 pub next_free: PageId,
183 pub count: u32,
185}
186
187impl FreePageHeader {
188 pub const SIZE: usize = 12;
190
191 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
193 let mut buf = [0u8; Self::SIZE];
194 let mut cursor = io::Cursor::new(&mut buf[..]);
195 cursor.write_u64::<LittleEndian>(self.next_free).unwrap();
196 cursor.write_u32::<LittleEndian>(self.count).unwrap();
197 buf
198 }
199
200 pub fn from_bytes(buf: &[u8]) -> io::Result<Self> {
202 let mut cursor = io::Cursor::new(buf);
203 Ok(Self {
204 next_free: cursor.read_u64::<LittleEndian>()?,
205 count: cursor.read_u32::<LittleEndian>()?,
206 })
207 }
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[repr(u8)]
213pub enum PageType {
214 Header = 0,
216 Catalog = 1,
218 ColumnGroup = 2,
220 IndexInterior = 3,
222 IndexLeaf = 4,
224 Free = 5,
226 Overflow = 6,
228}
229
230#[derive(Debug, Clone, Default)]
232pub struct PageManagerStats {
233 pub total_pages: u64,
235 pub used_pages: u64,
237 pub free_pages: u64,
239 pub allocations: u64,
241 pub deallocations: u64,
243 pub page_size: u32,
245 pub file_size: u64,
247 pub space_amplification: f64,
249}
250
251pub struct PageManager {
253 path: PathBuf,
255 file: RwLock<File>,
257 header: RwLock<DbHeader>,
259 page_size: u32,
261 allocations: AtomicU64,
263 deallocations: AtomicU64,
264}
265
266impl PageManager {
267 pub fn create<P: AsRef<Path>>(path: P, page_size: u32) -> io::Result<Self> {
269 let path = path.as_ref().to_path_buf();
270
271 let mut file = OpenOptions::new()
273 .create(true)
274 .read(true)
275 .write(true)
276 .truncate(true)
277 .open(&path)?;
278
279 let header = DbHeader::new(page_size);
281 let header_bytes = header.to_bytes();
282 file.write_all(&header_bytes)?;
283
284 let padding = vec![0u8; page_size as usize - DbHeader::SIZE];
286 file.write_all(&padding)?;
287
288 let catalog_page = vec![0u8; page_size as usize];
290 file.write_all(&catalog_page)?;
291
292 file.sync_all()?;
293
294 Ok(Self {
295 path,
296 file: RwLock::new(file),
297 header: RwLock::new(header),
298 page_size,
299 allocations: AtomicU64::new(0),
300 deallocations: AtomicU64::new(0),
301 })
302 }
303
304 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
306 let path = path.as_ref().to_path_buf();
307
308 let mut file = OpenOptions::new().read(true).write(true).open(&path)?;
309
310 let mut header_buf = [0u8; DbHeader::SIZE];
312 file.seek(SeekFrom::Start(0))?;
313 file.read_exact(&mut header_buf)?;
314
315 let header = DbHeader::from_bytes(&header_buf)?;
316
317 if !header.validate() {
318 return Err(io::Error::new(
319 io::ErrorKind::InvalidData,
320 "Invalid database header or checksum",
321 ));
322 }
323
324 let page_size = header.page_size;
325
326 Ok(Self {
327 path,
328 file: RwLock::new(file),
329 header: RwLock::new(header),
330 page_size,
331 allocations: AtomicU64::new(0),
332 deallocations: AtomicU64::new(0),
333 })
334 }
335
336 pub fn allocate_page(&self) -> io::Result<PageId> {
340 let mut header = self.header.write();
341 let mut file = self.file.write();
342
343 self.allocations.fetch_add(1, Ordering::Relaxed);
344
345 if header.free_list_head != 0 {
346 let page_id = header.free_list_head;
348
349 let offset = page_id * self.page_size as u64;
351 file.seek(SeekFrom::Start(offset))?;
352
353 let mut free_header_buf = [0u8; FreePageHeader::SIZE];
354 file.read_exact(&mut free_header_buf)?;
355 let free_header = FreePageHeader::from_bytes(&free_header_buf)?;
356
357 header.free_list_head = free_header.next_free;
359 header.modified_us = now_micros();
360 header.checksum = header.compute_checksum();
361
362 file.seek(SeekFrom::Start(0))?;
364 file.write_all(&header.to_bytes())?;
365
366 Ok(page_id)
367 } else {
368 let page_id = header.total_pages;
370 header.total_pages += 1;
371 header.modified_us = now_micros();
372 header.checksum = header.compute_checksum();
373
374 file.seek(SeekFrom::Start(0))?;
376 file.write_all(&header.to_bytes())?;
377
378 let offset = page_id * self.page_size as u64;
380 file.seek(SeekFrom::Start(offset))?;
381 let zero_page = vec![0u8; self.page_size as usize];
382 file.write_all(&zero_page)?;
383
384 Ok(page_id)
385 }
386 }
387
388 pub fn deallocate_page(&self, page_id: PageId) -> io::Result<()> {
392 if page_id < 2 {
393 return Err(io::Error::new(
394 io::ErrorKind::InvalidInput,
395 "Cannot deallocate header or catalog pages",
396 ));
397 }
398
399 let mut header = self.header.write();
400 let mut file = self.file.write();
401
402 self.deallocations.fetch_add(1, Ordering::Relaxed);
403
404 let free_header = FreePageHeader {
406 next_free: header.free_list_head,
407 count: 1,
408 };
409
410 let offset = page_id * self.page_size as u64;
412 file.seek(SeekFrom::Start(offset))?;
413 file.write_all(&free_header.to_bytes())?;
414
415 header.free_list_head = page_id;
417 header.modified_us = now_micros();
418 header.checksum = header.compute_checksum();
419
420 file.seek(SeekFrom::Start(0))?;
421 file.write_all(&header.to_bytes())?;
422
423 Ok(())
424 }
425
426 pub fn read_page(&self, page_id: PageId) -> io::Result<Vec<u8>> {
428 let mut file = self.file.write();
429 let offset = page_id * self.page_size as u64;
430
431 file.seek(SeekFrom::Start(offset))?;
432 let mut buf = vec![0u8; self.page_size as usize];
433 file.read_exact(&mut buf)?;
434
435 Ok(buf)
436 }
437
438 pub fn write_page(&self, page_id: PageId, data: &[u8]) -> io::Result<()> {
440 if data.len() != self.page_size as usize {
441 return Err(io::Error::new(
442 io::ErrorKind::InvalidInput,
443 format!("Page data must be exactly {} bytes", self.page_size),
444 ));
445 }
446
447 let mut file = self.file.write();
448 let offset = page_id * self.page_size as u64;
449
450 file.seek(SeekFrom::Start(offset))?;
451 file.write_all(data)?;
452
453 {
455 let mut header = self.header.write();
456 header.modified_us = now_micros();
457 header.checksum = header.compute_checksum();
458 file.seek(SeekFrom::Start(0))?;
459 file.write_all(&header.to_bytes())?;
460 }
461
462 Ok(())
463 }
464
465 pub fn sync(&self) -> io::Result<()> {
467 self.file.read().sync_all()
468 }
469
470 pub fn page_size(&self) -> u32 {
472 self.page_size
473 }
474
475 pub fn total_pages(&self) -> u64 {
477 self.header.read().total_pages
478 }
479
480 pub fn path(&self) -> &Path {
482 &self.path
483 }
484
485 pub fn stats(&self) -> io::Result<PageManagerStats> {
487 let header = self.header.read();
488 let file = self.file.read();
489 let file_size = file.metadata()?.len();
490
491 let mut free_count = 0u64;
493 let mut current = header.free_list_head;
494
495 drop(header);
496 drop(file);
497
498 while current != 0 {
499 free_count += 1;
500 let page_data = self.read_page(current)?;
501 let free_header = FreePageHeader::from_bytes(&page_data)?;
502 current = free_header.next_free;
503
504 if free_count > 1_000_000 {
506 break;
507 }
508 }
509
510 let header = self.header.read();
511 let used_pages = header.total_pages - free_count;
512 let logical_size = used_pages * self.page_size as u64;
513 let space_amp = if logical_size > 0 {
514 file_size as f64 / logical_size as f64
515 } else {
516 1.0
517 };
518
519 Ok(PageManagerStats {
520 total_pages: header.total_pages,
521 used_pages,
522 free_pages: free_count,
523 allocations: self.allocations.load(Ordering::Relaxed),
524 deallocations: self.deallocations.load(Ordering::Relaxed),
525 page_size: self.page_size,
526 file_size,
527 space_amplification: space_amp,
528 })
529 }
530}
531
532fn now_micros() -> u64 {
533 std::time::SystemTime::now()
534 .duration_since(std::time::UNIX_EPOCH)
535 .map(|d| d.as_micros() as u64)
536 .unwrap_or(0)
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542 use tempfile::tempdir;
543
544 #[test]
545 fn test_create_and_open() {
546 let dir = tempdir().unwrap();
547 let path = dir.path().join("test.sochdb");
548
549 {
551 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
552 assert_eq!(pm.total_pages(), 2);
553 assert_eq!(pm.page_size(), DEFAULT_PAGE_SIZE);
554 }
555
556 {
558 let pm = PageManager::open(&path).unwrap();
559 assert_eq!(pm.total_pages(), 2);
560 }
561 }
562
563 #[test]
564 fn test_allocate_and_deallocate() {
565 let dir = tempdir().unwrap();
566 let path = dir.path().join("test.sochdb");
567
568 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
569
570 let p1 = pm.allocate_page().unwrap();
572 let p2 = pm.allocate_page().unwrap();
573 let p3 = pm.allocate_page().unwrap();
574
575 assert_eq!(p1, 2); assert_eq!(p2, 3);
577 assert_eq!(p3, 4);
578 assert_eq!(pm.total_pages(), 5);
579
580 pm.deallocate_page(p2).unwrap();
582
583 let p4 = pm.allocate_page().unwrap();
585 assert_eq!(p4, 3); let stats = pm.stats().unwrap();
589 assert_eq!(stats.total_pages, 5);
590 assert_eq!(stats.free_pages, 0);
591 assert!(stats.space_amplification < 1.5);
592 }
593
594 #[test]
595 fn test_read_write_page() {
596 let dir = tempdir().unwrap();
597 let path = dir.path().join("test.sochdb");
598
599 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
600 let page_id = pm.allocate_page().unwrap();
601
602 let mut data = vec![0u8; DEFAULT_PAGE_SIZE as usize];
604 data[0..4].copy_from_slice(b"TEST");
605 data[100..108].copy_from_slice(&12345u64.to_le_bytes());
606 pm.write_page(page_id, &data).unwrap();
607
608 let read_data = pm.read_page(page_id).unwrap();
610 assert_eq!(&read_data[0..4], b"TEST");
611
612 let value = u64::from_le_bytes(read_data[100..108].try_into().unwrap());
613 assert_eq!(value, 12345);
614 }
615
616 #[test]
617 fn test_header_validation() {
618 let header = DbHeader::new(4096);
619 assert!(header.validate());
620
621 let bytes = header.to_bytes();
622 let restored = DbHeader::from_bytes(&bytes).unwrap();
623 assert!(restored.validate());
624
625 let mut bad_bytes = bytes;
627 bad_bytes[10] = 0xFF;
628 let bad_header = DbHeader::from_bytes(&bad_bytes).unwrap();
629 assert!(!bad_header.validate());
630 }
631
632 #[test]
633 fn test_cannot_deallocate_system_pages() {
634 let dir = tempdir().unwrap();
635 let path = dir.path().join("test.sochdb");
636
637 let pm = PageManager::create(&path, DEFAULT_PAGE_SIZE).unwrap();
638
639 assert!(pm.deallocate_page(0).is_err());
641 assert!(pm.deallocate_page(1).is_err());
642 }
643}