1use byteorder::{ByteOrder, LittleEndian};
63use parking_lot::RwLock;
64use serde::{Deserialize, Serialize};
65use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67
68use crate::{Result, SochDBError};
69
70pub const DEFAULT_BLOCK_SIZE: usize = 4096;
72
73pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78#[repr(u8)]
79pub enum BlockCompression {
80 None = 0,
82 Lz4 = 1,
84 Zstd = 2,
86}
87
88impl BlockCompression {
89 pub fn from_byte(b: u8) -> Self {
90 match b {
91 1 => BlockCompression::Lz4,
92 2 => BlockCompression::Zstd,
93 _ => BlockCompression::None,
94 }
95 }
96
97 pub fn to_byte(&self) -> u8 {
98 *self as u8
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct BlockRef {
105 pub store_offset: u64,
107 pub compressed_len: u32,
109 pub original_len: u32,
111 pub compression: BlockCompression,
113 pub checksum: u32,
115}
116
117impl BlockRef {
120 pub const SERIALIZED_SIZE: usize = 21;
122
123 pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
125 let mut buf = [0u8; Self::SERIALIZED_SIZE];
126 LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
127 LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
128 LittleEndian::write_u32(&mut buf[12..16], self.original_len);
129 buf[16] = self.compression as u8;
130 LittleEndian::write_u32(&mut buf[17..21], self.checksum);
131 Ok(buf)
132 }
133
134 pub fn from_bytes(data: &[u8]) -> Result<Self> {
136 if data.len() < Self::SERIALIZED_SIZE {
137 return Err(SochDBError::Serialization(format!(
138 "BlockRef too short: {} < {}",
139 data.len(),
140 Self::SERIALIZED_SIZE
141 )));
142 }
143
144 Ok(Self {
145 store_offset: LittleEndian::read_u64(&data[0..8]),
146 compressed_len: LittleEndian::read_u32(&data[8..12]),
147 original_len: LittleEndian::read_u32(&data[12..16]),
148 compression: BlockCompression::from_byte(data[16]),
149 checksum: LittleEndian::read_u32(&data[17..21]),
150 })
151 }
152}
153
154#[derive(Debug, Clone)]
167pub struct BlockHeader {
168 pub magic: [u8; 4],
170 pub compression: u8,
172 pub original_size: u32,
174 pub compressed_size: u32,
176 pub checksum: u32,
178}
179
180impl BlockHeader {
181 pub const MAGIC: [u8; 4] = *b"TBLK";
182 pub const SIZE: usize = 17; pub fn to_bytes(&self) -> [u8; Self::SIZE] {
186 let mut buf = [0u8; Self::SIZE];
187 buf[0..4].copy_from_slice(&Self::MAGIC);
188 buf[4] = self.compression;
189 LittleEndian::write_u32(&mut buf[5..9], self.original_size);
190 LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
191 LittleEndian::write_u32(&mut buf[13..17], self.checksum);
192 buf
193 }
194
195 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
197 if buf.len() < Self::SIZE {
198 return Err(SochDBError::Corruption(format!(
199 "BlockHeader too short: {} < {}",
200 buf.len(),
201 Self::SIZE
202 )));
203 }
204
205 if buf[0..4] != Self::MAGIC {
206 return Err(SochDBError::Corruption(format!(
207 "Invalid block magic: expected {:?}, got {:?}",
208 Self::MAGIC,
209 &buf[0..4]
210 )));
211 }
212
213 Ok(Self {
214 magic: Self::MAGIC,
215 compression: buf[4],
216 original_size: LittleEndian::read_u32(&buf[5..9]),
217 compressed_size: LittleEndian::read_u32(&buf[9..13]),
218 checksum: LittleEndian::read_u32(&buf[13..17]),
219 })
220 }
221}
222
223pub struct BlockStore {
225 data: RwLock<Vec<u8>>,
227 next_offset: AtomicU64,
229 index: RwLock<HashMap<u64, BlockRef>>,
231 ref_counts: RwLock<HashMap<u64, u32>>,
233}
234
235impl BlockStore {
236 pub fn new() -> Self {
238 Self {
239 data: RwLock::new(Vec::new()),
240 next_offset: AtomicU64::new(0),
241 index: RwLock::new(HashMap::new()),
242 ref_counts: RwLock::new(HashMap::new()),
243 }
244 }
245
246 pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
250 let compression = self.select_compression(data);
251 self.write_block_with_compression(data, compression)
252 }
253
254 pub fn write_block_with_compression(
256 &self,
257 data: &[u8],
258 compression: BlockCompression,
259 ) -> Result<BlockRef> {
260 let compressed = self.compress(data, compression)?;
262
263 let checksum = crc32fast::hash(&compressed);
265
266 let header_size = BlockHeader::SIZE;
268 let total_size = header_size + compressed.len();
269 let offset = self
270 .next_offset
271 .fetch_add(total_size as u64, Ordering::SeqCst);
272
273 let header = BlockHeader {
275 magic: BlockHeader::MAGIC,
276 compression: compression as u8,
277 original_size: data.len() as u32,
278 compressed_size: compressed.len() as u32,
279 checksum,
280 };
281
282 {
284 let mut store = self.data.write();
285 store.resize((offset + total_size as u64) as usize, 0);
286
287 let header_bytes = header.to_bytes();
289 store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
290
291 store[offset as usize + header_size..offset as usize + total_size]
293 .copy_from_slice(&compressed);
294 }
295
296 let block_ref = BlockRef {
298 store_offset: offset,
299 compressed_len: compressed.len() as u32,
300 original_len: data.len() as u32,
301 compression,
302 checksum,
303 };
304
305 self.index.write().insert(offset, block_ref.clone());
307
308 self.ref_counts.write().insert(offset, 1);
310
311 Ok(block_ref)
312 }
313
314 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
316 let offset = block_ref.store_offset as usize;
317 let header_size = BlockHeader::SIZE;
318 let total_size = header_size + block_ref.compressed_len as usize;
319
320 let compressed = {
322 let store = self.data.read();
323 if offset + total_size > store.len() {
324 return Err(SochDBError::NotFound("Block not found".into()));
325 }
326
327 let header_bytes = &store[offset..offset + header_size];
329 let _header = BlockHeader::from_bytes(header_bytes)?;
330
331 store[offset + header_size..offset + total_size].to_vec()
333 };
334
335 let checksum = crc32fast::hash(&compressed);
337 if checksum != block_ref.checksum {
338 return Err(SochDBError::Corruption("Block checksum mismatch".into()));
339 }
340
341 self.decompress(
343 &compressed,
344 block_ref.compression,
345 block_ref.original_len as usize,
346 )
347 }
348
349 fn select_compression(&self, data: &[u8]) -> BlockCompression {
351 if data.len() < 128 {
352 return BlockCompression::None; }
354
355 if is_soch_content(data) {
357 BlockCompression::Zstd } else if is_json_content(data) || is_compressible(data) {
359 BlockCompression::Lz4 } else {
361 BlockCompression::None }
363 }
364
365 fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
371 match compression {
372 BlockCompression::None => Ok(data.to_vec()),
373 BlockCompression::Lz4 => {
374 match lz4::block::compress(data, None, false) {
375 Ok(compressed) => {
376 if compressed.len() >= data.len() {
378 Ok(data.to_vec())
379 } else {
380 Ok(compressed)
381 }
382 }
383 Err(_) => {
384 Ok(data.to_vec())
386 }
387 }
388 }
389 BlockCompression::Zstd => {
390 match zstd::encode_all(data, 3) {
392 Ok(compressed) => {
393 if compressed.len() >= data.len() {
395 Ok(data.to_vec())
396 } else {
397 Ok(compressed)
398 }
399 }
400 Err(_) => {
401 Ok(data.to_vec())
403 }
404 }
405 }
406 }
407 }
408
409 fn decompress(
411 &self,
412 data: &[u8],
413 compression: BlockCompression,
414 original_size: usize,
415 ) -> Result<Vec<u8>> {
416 match compression {
417 BlockCompression::None => Ok(data.to_vec()),
418 BlockCompression::Lz4 => {
419 if data.len() == original_size {
421 return Ok(data.to_vec());
422 }
423
424 lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
425 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
426 })
427 }
428 BlockCompression::Zstd => {
429 if data.len() == original_size {
431 return Ok(data.to_vec());
432 }
433
434 zstd::decode_all(data).map_err(|e| {
435 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
436 })
437 }
438 }
439 }
440
441 pub fn add_ref(&self, offset: u64) {
443 let mut refs = self.ref_counts.write();
444 *refs.entry(offset).or_insert(0) += 1;
445 }
446
447 pub fn release_ref(&self, offset: u64) -> bool {
449 let mut refs = self.ref_counts.write();
450 if let Some(count) = refs.get_mut(&offset) {
451 *count = count.saturating_sub(1);
452 return *count == 0;
453 }
454 false
455 }
456
457 pub fn stats(&self) -> BlockStoreStats {
459 let data = self.data.read();
460 let index = self.index.read();
461
462 let mut total_original = 0u64;
463 let mut total_compressed = 0u64;
464
465 for block_ref in index.values() {
466 total_original += block_ref.original_len as u64;
467 total_compressed += block_ref.compressed_len as u64;
468 }
469
470 BlockStoreStats {
471 total_bytes: data.len() as u64,
472 block_count: index.len(),
473 total_original_bytes: total_original,
474 total_compressed_bytes: total_compressed,
475 compression_ratio: if total_compressed > 0 {
476 total_original as f64 / total_compressed as f64
477 } else {
478 1.0
479 },
480 }
481 }
482}
483
484impl Default for BlockStore {
485 fn default() -> Self {
486 Self::new()
487 }
488}
489
490#[derive(Debug, Clone, Default)]
492pub struct BlockStoreStats {
493 pub total_bytes: u64,
495 pub block_count: usize,
497 pub total_original_bytes: u64,
499 pub total_compressed_bytes: u64,
501 pub compression_ratio: f64,
503}
504
505pub fn is_soch_content(data: &[u8]) -> bool {
507 if data.len() < 10 {
509 return false;
510 }
511
512 let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
514 s.contains('[') && s.contains('{') && s.contains(':')
515}
516
517pub fn is_json_content(data: &[u8]) -> bool {
519 if data.is_empty() {
520 return false;
521 }
522
523 let first = data[0];
525 first == b'{' || first == b'['
526}
527
528pub fn is_compressible(data: &[u8]) -> bool {
530 if data.len() < 64 {
531 return false;
532 }
533
534 let sample_size = data.len().min(256);
536 let mut seen = [false; 256];
537 let mut unique = 0;
538
539 for &byte in &data[..sample_size] {
540 if !seen[byte as usize] {
541 seen[byte as usize] = true;
542 unique += 1;
543 }
544 }
545
546 unique < sample_size / 2
548}
549
550pub struct FileBlockManager {
552 store: BlockStore,
554 block_size: usize,
556}
557
558impl FileBlockManager {
559 pub fn new(block_size: usize) -> Self {
561 Self {
562 store: BlockStore::new(),
563 block_size: block_size.min(MAX_BLOCK_SIZE),
564 }
565 }
566
567 pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
569 let mut blocks = Vec::new();
570
571 for chunk in data.chunks(self.block_size) {
572 let block_ref = self.store.write_block(chunk)?;
573 blocks.push(block_ref);
574 }
575
576 Ok(blocks)
577 }
578
579 pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
581 let mut data = Vec::new();
582
583 for block_ref in blocks {
584 let block_data = self.store.read_block(block_ref)?;
585 data.extend(block_data);
586 }
587
588 Ok(data)
589 }
590
591 pub fn stats(&self) -> BlockStoreStats {
593 self.store.stats()
594 }
595}
596
597impl Default for FileBlockManager {
598 fn default() -> Self {
599 Self::new(DEFAULT_BLOCK_SIZE)
600 }
601}
602
603use std::fs::{File, OpenOptions};
608use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
609use std::path::{Path, PathBuf};
610
611#[derive(Debug, Clone, Copy, PartialEq, Eq)]
617#[repr(u8)]
618pub enum BlockWalRecordType {
619 BlockWrite = 1,
621 Checkpoint = 2,
623 Commit = 3,
625 TxnBegin = 4,
627}
628
629impl BlockWalRecordType {
630 fn from_byte(b: u8) -> Option<Self> {
631 match b {
632 1 => Some(BlockWalRecordType::BlockWrite),
633 2 => Some(BlockWalRecordType::Checkpoint),
634 3 => Some(BlockWalRecordType::Commit),
635 4 => Some(BlockWalRecordType::TxnBegin),
636 _ => None,
637 }
638 }
639}
640
641#[derive(Debug, Clone)]
655pub struct WalRecordHeader {
656 pub lsn: u64,
657 pub txn_id: u64,
658 pub record_type: BlockWalRecordType,
659 pub page_id: u64,
660 pub data_len: u32,
661 pub crc32: u32,
662}
663
664impl WalRecordHeader {
665 pub const SIZE: usize = 33;
666
667 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
669 let mut buf = [0u8; Self::SIZE];
670 LittleEndian::write_u64(&mut buf[0..8], self.lsn);
671 LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
672 buf[16] = self.record_type as u8;
673 LittleEndian::write_u64(&mut buf[17..25], self.page_id);
674 LittleEndian::write_u32(&mut buf[25..29], self.data_len);
675 LittleEndian::write_u32(&mut buf[29..33], self.crc32);
676 buf
677 }
678
679 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
681 if buf.len() < Self::SIZE {
682 return Err(SochDBError::Corruption(format!(
683 "WAL record header too short: {} < {}",
684 buf.len(),
685 Self::SIZE
686 )));
687 }
688
689 let record_type = BlockWalRecordType::from_byte(buf[16]).ok_or_else(|| {
690 SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
691 })?;
692
693 Ok(Self {
694 lsn: LittleEndian::read_u64(&buf[0..8]),
695 txn_id: LittleEndian::read_u64(&buf[8..16]),
696 record_type,
697 page_id: LittleEndian::read_u64(&buf[17..25]),
698 data_len: LittleEndian::read_u32(&buf[25..29]),
699 crc32: LittleEndian::read_u32(&buf[29..33]),
700 })
701 }
702
703 pub fn compute_crc32(&self, data: &[u8]) -> u32 {
705 let mut hasher = crc32fast::Hasher::new();
706
707 let mut header_buf = [0u8; 29];
709 LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
710 LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
711 header_buf[16] = self.record_type as u8;
712 LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
713 LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
714
715 hasher.update(&header_buf);
716 hasher.update(data);
717 hasher.finalize()
718 }
719}
720
721pub struct WalWriter {
723 file: BufWriter<File>,
725 next_lsn: u64,
727 #[allow(dead_code)]
729 path: PathBuf,
730}
731
732impl WalWriter {
733 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
735 let path = path.as_ref().to_path_buf();
736
737 let file = OpenOptions::new()
739 .create(true)
740 .read(true)
741 .append(true)
742 .open(&path)?;
743
744 let metadata = file.metadata()?;
746 let next_lsn = metadata.len();
747
748 Ok(Self {
749 file: BufWriter::new(file),
750 next_lsn,
751 path,
752 })
753 }
754
755 pub fn append(
757 &mut self,
758 txn_id: u64,
759 record_type: BlockWalRecordType,
760 page_id: u64,
761 data: &[u8],
762 ) -> Result<u64> {
763 let lsn = self.next_lsn;
764
765 let mut header = WalRecordHeader {
766 lsn,
767 txn_id,
768 record_type,
769 page_id,
770 data_len: data.len() as u32,
771 crc32: 0, };
773
774 header.crc32 = header.compute_crc32(data);
776
777 let header_bytes = header.to_bytes();
779 self.file.write_all(&header_bytes)?;
780
781 self.file.write_all(data)?;
783
784 self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
786
787 Ok(lsn)
788 }
789
790 pub fn sync(&mut self) -> Result<()> {
792 self.file.flush()?;
793 self.file.get_ref().sync_all()?;
794 Ok(())
795 }
796
797 pub fn current_lsn(&self) -> u64 {
799 self.next_lsn
800 }
801}
802
803pub struct WalReader {
805 reader: BufReader<File>,
806 #[allow(dead_code)]
807 path: PathBuf,
808}
809
810impl WalReader {
811 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
813 let path = path.as_ref().to_path_buf();
814 let file = File::open(&path)?;
815
816 Ok(Self {
817 reader: BufReader::new(file),
818 path,
819 })
820 }
821
822 pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
824 let mut header_buf = [0u8; WalRecordHeader::SIZE];
826 match self.reader.read_exact(&mut header_buf) {
827 Ok(()) => {}
828 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
829 Err(e) => return Err(e.into()),
830 }
831
832 let header = WalRecordHeader::from_bytes(&header_buf)?;
833
834 let mut data = vec![0u8; header.data_len as usize];
836 self.reader.read_exact(&mut data)?;
837
838 let computed_crc = header.compute_crc32(&data);
840 if computed_crc != header.crc32 {
841 return Err(SochDBError::Corruption(format!(
842 "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
843 header.lsn, header.crc32, computed_crc
844 )));
845 }
846
847 Ok(Some((header, data)))
848 }
849
850 pub fn iter(&mut self) -> WalIterator<'_> {
852 WalIterator { reader: self }
853 }
854}
855
856pub struct WalIterator<'a> {
858 reader: &'a mut WalReader,
859}
860
861impl<'a> Iterator for WalIterator<'a> {
862 type Item = Result<(WalRecordHeader, Vec<u8>)>;
863
864 fn next(&mut self) -> Option<Self::Item> {
865 match self.reader.read_next() {
866 Ok(Some(record)) => Some(Ok(record)),
867 Ok(None) => None,
868 Err(e) => Some(Err(e)),
869 }
870 }
871}
872
873pub struct DurableBlockStore {
880 store: BlockStore,
882 wal: parking_lot::Mutex<WalWriter>,
884 data_file: parking_lot::Mutex<File>,
886 dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
888 checkpoint_lsn: AtomicU64,
890 data_dir: PathBuf,
892 next_page_id: AtomicU64,
894}
895
896impl DurableBlockStore {
897 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
899 let data_dir = data_dir.as_ref().to_path_buf();
900
901 std::fs::create_dir_all(&data_dir)?;
903
904 let wal_path = data_dir.join("wal.log");
905 let data_path = data_dir.join("blocks.dat");
906
907 let wal = WalWriter::open(&wal_path)?;
909
910 let data_file = OpenOptions::new()
912 .create(true)
913 .read(true)
914 .write(true)
915 .truncate(false)
916 .open(&data_path)?;
917
918 let store = Self {
919 store: BlockStore::new(),
920 wal: parking_lot::Mutex::new(wal),
921 data_file: parking_lot::Mutex::new(data_file),
922 dirty_pages: RwLock::new(HashMap::new()),
923 checkpoint_lsn: AtomicU64::new(0),
924 data_dir,
925 next_page_id: AtomicU64::new(0),
926 };
927
928 Ok(store)
929 }
930
931 pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
937 let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
939
940 {
942 let mut wal = self.wal.lock();
943 wal.append(txn_id, BlockWalRecordType::BlockWrite, page_id, data)?;
944 wal.sync()?; }
946
947 let block_ref = self.store.write_block(data)?;
949
950 self.dirty_pages.write().insert(page_id, data.to_vec());
952
953 Ok(block_ref)
954 }
955
956 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
958 self.store.read_block(block_ref)
959 }
960
961 pub fn commit(&self, txn_id: u64) -> Result<u64> {
963 let mut wal = self.wal.lock();
964 let lsn = wal.append(txn_id, BlockWalRecordType::Commit, 0, &[])?;
965 wal.sync()?; Ok(lsn)
967 }
968
969 pub fn checkpoint(&self) -> Result<u64> {
975 let dirty: Vec<(u64, Vec<u8>)> = {
977 let mut pages = self.dirty_pages.write();
978 pages.drain().collect()
979 };
980
981 {
983 let mut file = self.data_file.lock();
984 for (page_id, data) in &dirty {
985 let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
986 file.seek(SeekFrom::Start(offset))?;
987 file.write_all(data)?;
988 }
989 file.sync_all()?;
990 }
991
992 let lsn = {
994 let mut wal = self.wal.lock();
995 let lsn = wal.append(0, BlockWalRecordType::Checkpoint, 0, &[])?;
996 wal.sync()?;
997 lsn
998 };
999
1000 self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
1002
1003 Ok(lsn)
1004 }
1005
1006 pub fn recover(&mut self) -> Result<RecoveryStats> {
1010 let wal_path = self.data_dir.join("wal.log");
1011
1012 if !wal_path.exists() {
1013 return Ok(RecoveryStats::default());
1014 }
1015
1016 let mut reader = WalReader::open(&wal_path)?;
1017 let mut stats = RecoveryStats::default();
1018
1019 let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1021 let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1022
1023 for record_result in reader.iter() {
1025 let (header, data) = record_result?;
1026 stats.records_read += 1;
1027
1028 match header.record_type {
1029 BlockWalRecordType::BlockWrite => {
1030 pending_txns
1031 .entry(header.txn_id)
1032 .or_default()
1033 .push((header.page_id, data));
1034 }
1035 BlockWalRecordType::Commit => {
1036 committed_txns.insert(header.txn_id);
1037 stats.txns_committed += 1;
1038 }
1039 BlockWalRecordType::Checkpoint => {
1040 self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1041 stats.checkpoints_found += 1;
1042 }
1043 BlockWalRecordType::TxnBegin => {
1044 }
1046 }
1047 }
1048
1049 for txn_id in &committed_txns {
1051 if let Some(writes) = pending_txns.remove(txn_id) {
1052 for (page_id, data) in writes {
1053 self.store.write_block(&data)?;
1054 self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1055 stats.blocks_recovered += 1;
1056 }
1057 }
1058 }
1059
1060 stats.txns_aborted = pending_txns.len();
1062
1063 Ok(stats)
1064 }
1065
1066 pub fn stats(&self) -> DurableBlockStoreStats {
1068 let store_stats = self.store.stats();
1069 DurableBlockStoreStats {
1070 block_stats: store_stats,
1071 dirty_page_count: self.dirty_pages.read().len(),
1072 checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1073 wal_size: self.wal.lock().current_lsn(),
1074 }
1075 }
1076}
1077
1078#[derive(Debug, Clone, Default)]
1080pub struct RecoveryStats {
1081 pub records_read: usize,
1083 pub txns_committed: usize,
1085 pub txns_aborted: usize,
1087 pub blocks_recovered: usize,
1089 pub checkpoints_found: usize,
1091}
1092
1093#[derive(Debug, Clone)]
1095pub struct DurableBlockStoreStats {
1096 pub block_stats: BlockStoreStats,
1098 pub dirty_page_count: usize,
1100 pub checkpoint_lsn: u64,
1102 pub wal_size: u64,
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::*;
1109
1110 #[test]
1111 fn test_block_store_write_read() {
1112 let store = BlockStore::new();
1113
1114 let data = b"Hello, SochFS block storage!";
1115 let block_ref = store.write_block(data).unwrap();
1116
1117 let read_data = store.read_block(&block_ref).unwrap();
1118 assert_eq!(read_data, data);
1119 }
1120
1121 #[test]
1122 fn test_compression_selection() {
1123 let store = BlockStore::new();
1124
1125 let small = b"hi";
1127 assert_eq!(store.select_compression(small), BlockCompression::None);
1128
1129 let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1131 let compression = store.select_compression(toon);
1133 assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1134
1135 let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1137 let compression = store.select_compression(json);
1139 assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1140 }
1141
1142 #[test]
1143 fn test_lz4_compression() {
1144 let store = BlockStore::new();
1145
1146 let data = "Hello, world! ".repeat(100);
1147 let block_ref = store
1148 .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1149 .unwrap();
1150
1151 let read_data = store.read_block(&block_ref).unwrap();
1155 assert_eq!(read_data, data.as_bytes());
1156 }
1157
1158 #[test]
1159 fn test_zstd_compression() {
1160 let store = BlockStore::new();
1161
1162 let data = "TOON format is very repetitive ".repeat(100);
1163 let block_ref = store
1164 .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1165 .unwrap();
1166
1167 let read_data = store.read_block(&block_ref).unwrap();
1171 assert_eq!(read_data, data.as_bytes());
1172 }
1173
1174 #[test]
1175 fn test_file_block_manager() {
1176 let manager = FileBlockManager::new(1024);
1177
1178 let data = "Test data ".repeat(500); let blocks = manager.write_file(data.as_bytes()).unwrap();
1180
1181 assert!(blocks.len() > 1); let read_data = manager.read_file(&blocks).unwrap();
1184 assert_eq!(read_data, data.as_bytes());
1185 }
1186
1187 #[test]
1188 fn test_stats() {
1189 let store = BlockStore::new();
1190
1191 let data = "Repetitive data pattern ".repeat(100);
1193 store.write_block(data.as_bytes()).unwrap();
1194
1195 let stats = store.stats();
1196 assert_eq!(stats.block_count, 1);
1197 assert!(stats.compression_ratio >= 1.0);
1200 }
1201
1202 #[test]
1207 fn test_block_header_fixed_layout() {
1208 let header = BlockHeader {
1209 magic: BlockHeader::MAGIC,
1210 compression: BlockCompression::Zstd as u8,
1211 original_size: 4096,
1212 compressed_size: 1024,
1213 checksum: 0xDEADBEEF,
1214 };
1215
1216 let bytes = header.to_bytes();
1217
1218 assert_eq!(bytes.len(), 17);
1220
1221 assert_eq!(&bytes[0..4], b"TBLK");
1223
1224 assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1226
1227 assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1229
1230 assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1232
1233 assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1235 }
1236
1237 #[test]
1238 fn test_block_header_roundtrip() {
1239 let original = BlockHeader {
1240 magic: BlockHeader::MAGIC,
1241 compression: BlockCompression::Lz4 as u8,
1242 original_size: 65536,
1243 compressed_size: 32768,
1244 checksum: 0x12345678,
1245 };
1246
1247 let bytes = original.to_bytes();
1248 let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1249
1250 assert_eq!(recovered.compression, original.compression);
1251 assert_eq!(recovered.original_size, original.original_size);
1252 assert_eq!(recovered.compressed_size, original.compressed_size);
1253 assert_eq!(recovered.checksum, original.checksum);
1254 }
1255
1256 #[test]
1257 fn test_block_header_invalid_magic() {
1258 let mut bytes = [0u8; 17];
1259 bytes[0..4].copy_from_slice(b"XXXX"); let result = BlockHeader::from_bytes(&bytes);
1262 assert!(result.is_err());
1263
1264 let err = result.unwrap_err();
1265 assert!(err.to_string().contains("Invalid block magic"));
1266 }
1267
1268 #[test]
1269 fn test_block_header_too_short() {
1270 let bytes = [0u8; 10]; let result = BlockHeader::from_bytes(&bytes);
1273 assert!(result.is_err());
1274 }
1275
1276 #[test]
1281 fn test_block_ref_fixed_layout() {
1282 let block_ref = BlockRef {
1283 store_offset: 0x123456789ABCDEF0,
1284 compressed_len: 4096,
1285 original_len: 8192,
1286 compression: BlockCompression::Zstd,
1287 checksum: 0xCAFEBABE,
1288 };
1289
1290 let bytes = block_ref.to_bytes().unwrap();
1291
1292 assert_eq!(bytes.len(), 21);
1294
1295 assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1297
1298 assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1300
1301 assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1303
1304 assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1306
1307 assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1309 }
1310
1311 #[test]
1312 fn test_block_ref_roundtrip() {
1313 let original = BlockRef {
1314 store_offset: u64::MAX, compressed_len: u32::MAX,
1316 original_len: u32::MAX,
1317 compression: BlockCompression::None,
1318 checksum: u32::MAX,
1319 };
1320
1321 let bytes = original.to_bytes().unwrap();
1322 let recovered = BlockRef::from_bytes(&bytes).unwrap();
1323
1324 assert_eq!(recovered.store_offset, original.store_offset);
1325 assert_eq!(recovered.compressed_len, original.compressed_len);
1326 assert_eq!(recovered.original_len, original.original_len);
1327 assert_eq!(recovered.compression, original.compression);
1328 assert_eq!(recovered.checksum, original.checksum);
1329 }
1330
1331 #[test]
1332 fn test_block_ref_too_short() {
1333 let bytes = [0u8; 10]; let result = BlockRef::from_bytes(&bytes);
1336 assert!(result.is_err());
1337
1338 let err = result.unwrap_err();
1339 assert!(err.to_string().contains("BlockRef too short"));
1340 }
1341
1342 #[test]
1343 fn test_cross_platform_compatibility() {
1344 let block_ref = BlockRef {
1346 store_offset: 0x0102030405060708,
1347 compressed_len: 0x0A0B0C0D,
1348 original_len: 0x0E0F1011,
1349 compression: BlockCompression::Lz4,
1350 checksum: 0x12131415,
1351 };
1352
1353 let bytes = block_ref.to_bytes().unwrap();
1354
1355 assert_eq!(bytes[0], 0x08); assert_eq!(bytes[7], 0x01); assert_eq!(bytes[8], 0x0D); assert_eq!(bytes[17], 0x15); }
1362
1363 #[test]
1368 fn test_lz4_compression_roundtrip() {
1369 let store = BlockStore::new();
1370
1371 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1373
1374 let block_ref = store
1375 .write_block_with_compression(&data, BlockCompression::Lz4)
1376 .unwrap();
1377 let recovered = store.read_block(&block_ref).unwrap();
1378
1379 assert_eq!(recovered, data);
1380 assert!(block_ref.compressed_len <= block_ref.original_len);
1382 }
1383
1384 #[test]
1385 fn test_zstd_compression_roundtrip() {
1386 let store = BlockStore::new();
1387
1388 let data = vec![0u8; 8192];
1390
1391 let block_ref = store
1392 .write_block_with_compression(&data, BlockCompression::Zstd)
1393 .unwrap();
1394 let recovered = store.read_block(&block_ref).unwrap();
1395
1396 assert_eq!(recovered, data);
1397 assert!(block_ref.compressed_len < block_ref.original_len / 2);
1399 }
1400
1401 #[test]
1402 fn test_compression_fallback_on_incompressible() {
1403 let store = BlockStore::new();
1404
1405 let mut data = vec![0u8; 256];
1407 for (i, byte) in data.iter_mut().enumerate().take(256) {
1408 *byte = ((i * 17 + 31) % 256) as u8; }
1410
1411 let block_ref = store
1412 .write_block_with_compression(&data, BlockCompression::Lz4)
1413 .unwrap();
1414 let recovered = store.read_block(&block_ref).unwrap();
1415
1416 assert_eq!(recovered, data);
1417 }
1418
1419 #[test]
1420 fn test_automatic_compression_selection() {
1421 let store = BlockStore::new();
1422
1423 let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1425 let json_ref = store.write_block(&json_data).unwrap();
1426 let json_recovered = store.read_block(&json_ref).unwrap();
1427 assert_eq!(json_recovered, json_data);
1428
1429 let mut soch_data = vec![0u8; 256];
1431 soch_data[0..4].copy_from_slice(b"TOON"); let soch_ref = store.write_block(&soch_data).unwrap();
1433 let soch_recovered = store.read_block(&soch_ref).unwrap();
1434 assert_eq!(soch_recovered, soch_data);
1435 }
1436
1437 #[test]
1438 fn test_small_data_no_compression() {
1439 let store = BlockStore::new();
1440
1441 let small_data = vec![42u8; 64];
1443 let block_ref = store.write_block(&small_data).unwrap();
1444
1445 assert_eq!(block_ref.compression, BlockCompression::None);
1447
1448 let recovered = store.read_block(&block_ref).unwrap();
1449 assert_eq!(recovered, small_data);
1450 }
1451
1452 #[test]
1453 fn test_compression_stats() {
1454 let store = BlockStore::new();
1455
1456 let data = vec![0u8; 4096];
1458 store
1459 .write_block_with_compression(&data, BlockCompression::Zstd)
1460 .unwrap();
1461
1462 let stats = store.stats();
1463 assert_eq!(stats.block_count, 1);
1464 assert!(
1465 stats.compression_ratio > 1.0,
1466 "Compression should reduce size"
1467 );
1468 assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1469 }
1470
1471 #[test]
1476 fn test_wal_record_header_roundtrip() {
1477 let original = WalRecordHeader {
1478 lsn: 12345,
1479 txn_id: 67890,
1480 record_type: BlockWalRecordType::BlockWrite,
1481 page_id: 42,
1482 data_len: 4096,
1483 crc32: 0xDEADBEEF,
1484 };
1485
1486 let bytes = original.to_bytes();
1487 let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1488
1489 assert_eq!(recovered.lsn, original.lsn);
1490 assert_eq!(recovered.txn_id, original.txn_id);
1491 assert_eq!(recovered.record_type, original.record_type);
1492 assert_eq!(recovered.page_id, original.page_id);
1493 assert_eq!(recovered.data_len, original.data_len);
1494 assert_eq!(recovered.crc32, original.crc32);
1495 }
1496
1497 #[test]
1498 fn test_wal_crc32() {
1499 let header = WalRecordHeader {
1500 lsn: 100,
1501 txn_id: 1,
1502 record_type: BlockWalRecordType::BlockWrite,
1503 page_id: 0,
1504 data_len: 4,
1505 crc32: 0,
1506 };
1507
1508 let data = b"test";
1509 let crc1 = header.compute_crc32(data);
1510 let crc2 = header.compute_crc32(data);
1511
1512 assert_eq!(crc1, crc2, "CRC should be deterministic");
1513
1514 let different_data = b"TEST";
1516 let crc3 = header.compute_crc32(different_data);
1517 assert_ne!(crc1, crc3, "Different data should have different CRC");
1518 }
1519
1520 #[test]
1521 fn test_durable_block_store_basic() {
1522 let dir = tempfile::tempdir().unwrap();
1523
1524 let store = DurableBlockStore::open(dir.path()).unwrap();
1525
1526 let data = b"Hello, durable block store!";
1528 let block_ref = store.write_block(1, data).unwrap();
1529
1530 let read_data = store.read_block(&block_ref).unwrap();
1532 assert_eq!(read_data, data);
1533
1534 store.commit(1).unwrap();
1536
1537 let stats = store.stats();
1539 assert_eq!(stats.dirty_page_count, 1);
1540 }
1541
1542 #[test]
1543 fn test_durable_block_store_checkpoint() {
1544 let dir = tempfile::tempdir().unwrap();
1545
1546 let store = DurableBlockStore::open(dir.path()).unwrap();
1547
1548 store.write_block(1, b"block1").unwrap();
1550 store.write_block(1, b"block2").unwrap();
1551 store.write_block(1, b"block3").unwrap();
1552 store.commit(1).unwrap();
1553
1554 let checkpoint_lsn = store.checkpoint().unwrap();
1556 assert!(checkpoint_lsn > 0);
1557
1558 let stats = store.stats();
1560 assert_eq!(stats.dirty_page_count, 0);
1561 assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1562 }
1563
1564 #[test]
1565 fn test_durable_block_store_recovery() {
1566 let dir = tempfile::tempdir().unwrap();
1567
1568 {
1570 let store = DurableBlockStore::open(dir.path()).unwrap();
1571 store.write_block(1, b"data1").unwrap();
1572 store.write_block(1, b"data2").unwrap();
1573 store.commit(1).unwrap();
1574
1575 }
1577
1578 {
1580 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1581 let stats = store.recover().unwrap();
1582
1583 assert_eq!(stats.txns_committed, 1);
1585 assert_eq!(stats.blocks_recovered, 2);
1586 assert_eq!(stats.txns_aborted, 0);
1587 }
1588 }
1589
1590 #[test]
1591 fn test_durable_block_store_uncommitted_recovery() {
1592 let dir = tempfile::tempdir().unwrap();
1593
1594 {
1596 let store = DurableBlockStore::open(dir.path()).unwrap();
1597 store.write_block(1, b"uncommitted_data").unwrap();
1598 }
1600
1601 {
1603 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1604 let stats = store.recover().unwrap();
1605
1606 assert_eq!(stats.txns_committed, 0);
1608 assert_eq!(stats.txns_aborted, 1);
1609 assert_eq!(stats.blocks_recovered, 0);
1610 }
1611 }
1612
1613 #[test]
1614 fn test_wal_writer_reader_roundtrip() {
1615 let dir = tempfile::tempdir().unwrap();
1616 let wal_path = dir.path().join("test.wal");
1617
1618 {
1620 let mut writer = WalWriter::open(&wal_path).unwrap();
1621 writer.append(1, BlockWalRecordType::TxnBegin, 0, &[]).unwrap();
1622 writer
1623 .append(1, BlockWalRecordType::BlockWrite, 0, b"data1")
1624 .unwrap();
1625 writer
1626 .append(1, BlockWalRecordType::BlockWrite, 1, b"data2")
1627 .unwrap();
1628 writer.append(1, BlockWalRecordType::Commit, 0, &[]).unwrap();
1629 writer.sync().unwrap();
1630 }
1631
1632 {
1634 let mut reader = WalReader::open(&wal_path).unwrap();
1635 let mut records = Vec::new();
1636 for record in reader.iter() {
1637 records.push(record.unwrap());
1638 }
1639
1640 assert_eq!(records.len(), 4);
1641 assert_eq!(records[0].0.record_type, BlockWalRecordType::TxnBegin);
1642 assert_eq!(records[1].0.record_type, BlockWalRecordType::BlockWrite);
1643 assert_eq!(records[1].1, b"data1");
1644 assert_eq!(records[2].0.record_type, BlockWalRecordType::BlockWrite);
1645 assert_eq!(records[2].1, b"data2");
1646 assert_eq!(records[3].0.record_type, BlockWalRecordType::Commit);
1647 }
1648 }
1649}