1use byteorder::{ByteOrder, LittleEndian};
63use parking_lot::RwLock;
64use serde::{Deserialize, Serialize};
65use std::collections::HashMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67
68use crate::{Result, SochDBError};
69
70pub const DEFAULT_BLOCK_SIZE: usize = 4096;
72
73pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
78#[repr(u8)]
79pub enum BlockCompression {
80 None = 0,
82 Lz4 = 1,
84 Zstd = 2,
86}
87
88impl BlockCompression {
89 pub fn from_byte(b: u8) -> Self {
90 match b {
91 1 => BlockCompression::Lz4,
92 2 => BlockCompression::Zstd,
93 _ => BlockCompression::None,
94 }
95 }
96
97 pub fn to_byte(&self) -> u8 {
98 *self as u8
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct BlockRef {
105 pub store_offset: u64,
107 pub compressed_len: u32,
109 pub original_len: u32,
111 pub compression: BlockCompression,
113 pub checksum: u32,
115}
116
117impl BlockRef {
120 pub const SERIALIZED_SIZE: usize = 21;
122
123 pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
125 let mut buf = [0u8; Self::SERIALIZED_SIZE];
126 LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
127 LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
128 LittleEndian::write_u32(&mut buf[12..16], self.original_len);
129 buf[16] = self.compression as u8;
130 LittleEndian::write_u32(&mut buf[17..21], self.checksum);
131 Ok(buf)
132 }
133
134 pub fn from_bytes(data: &[u8]) -> Result<Self> {
136 if data.len() < Self::SERIALIZED_SIZE {
137 return Err(SochDBError::Serialization(format!(
138 "BlockRef too short: {} < {}",
139 data.len(),
140 Self::SERIALIZED_SIZE
141 )));
142 }
143
144 Ok(Self {
145 store_offset: LittleEndian::read_u64(&data[0..8]),
146 compressed_len: LittleEndian::read_u32(&data[8..12]),
147 original_len: LittleEndian::read_u32(&data[12..16]),
148 compression: BlockCompression::from_byte(data[16]),
149 checksum: LittleEndian::read_u32(&data[17..21]),
150 })
151 }
152}
153
154#[derive(Debug, Clone)]
167pub struct BlockHeader {
168 pub magic: [u8; 4],
170 pub compression: u8,
172 pub original_size: u32,
174 pub compressed_size: u32,
176 pub checksum: u32,
178}
179
180impl BlockHeader {
181 pub const MAGIC: [u8; 4] = *b"TBLK";
182 pub const SIZE: usize = 17; pub fn to_bytes(&self) -> [u8; Self::SIZE] {
186 let mut buf = [0u8; Self::SIZE];
187 buf[0..4].copy_from_slice(&Self::MAGIC);
188 buf[4] = self.compression;
189 LittleEndian::write_u32(&mut buf[5..9], self.original_size);
190 LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
191 LittleEndian::write_u32(&mut buf[13..17], self.checksum);
192 buf
193 }
194
195 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
197 if buf.len() < Self::SIZE {
198 return Err(SochDBError::Corruption(format!(
199 "BlockHeader too short: {} < {}",
200 buf.len(),
201 Self::SIZE
202 )));
203 }
204
205 if buf[0..4] != Self::MAGIC {
206 return Err(SochDBError::Corruption(format!(
207 "Invalid block magic: expected {:?}, got {:?}",
208 Self::MAGIC,
209 &buf[0..4]
210 )));
211 }
212
213 Ok(Self {
214 magic: Self::MAGIC,
215 compression: buf[4],
216 original_size: LittleEndian::read_u32(&buf[5..9]),
217 compressed_size: LittleEndian::read_u32(&buf[9..13]),
218 checksum: LittleEndian::read_u32(&buf[13..17]),
219 })
220 }
221}
222
223pub struct BlockStore {
225 data: RwLock<Vec<u8>>,
227 next_offset: AtomicU64,
229 index: RwLock<HashMap<u64, BlockRef>>,
231 ref_counts: RwLock<HashMap<u64, u32>>,
233}
234
235impl BlockStore {
236 pub fn new() -> Self {
238 Self {
239 data: RwLock::new(Vec::new()),
240 next_offset: AtomicU64::new(0),
241 index: RwLock::new(HashMap::new()),
242 ref_counts: RwLock::new(HashMap::new()),
243 }
244 }
245
246 pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
250 let compression = self.select_compression(data);
251 self.write_block_with_compression(data, compression)
252 }
253
254 pub fn write_block_with_compression(
256 &self,
257 data: &[u8],
258 compression: BlockCompression,
259 ) -> Result<BlockRef> {
260 let compressed = self.compress(data, compression)?;
262
263 let checksum = crc32fast::hash(&compressed);
265
266 let header_size = BlockHeader::SIZE;
268 let total_size = header_size + compressed.len();
269 let offset = self
270 .next_offset
271 .fetch_add(total_size as u64, Ordering::SeqCst);
272
273 let header = BlockHeader {
275 magic: BlockHeader::MAGIC,
276 compression: compression as u8,
277 original_size: data.len() as u32,
278 compressed_size: compressed.len() as u32,
279 checksum,
280 };
281
282 {
284 let mut store = self.data.write();
285 store.resize((offset + total_size as u64) as usize, 0);
286
287 let header_bytes = header.to_bytes();
289 store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
290
291 store[offset as usize + header_size..offset as usize + total_size]
293 .copy_from_slice(&compressed);
294 }
295
296 let block_ref = BlockRef {
298 store_offset: offset,
299 compressed_len: compressed.len() as u32,
300 original_len: data.len() as u32,
301 compression,
302 checksum,
303 };
304
305 self.index.write().insert(offset, block_ref.clone());
307
308 self.ref_counts.write().insert(offset, 1);
310
311 Ok(block_ref)
312 }
313
314 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
316 let offset = block_ref.store_offset as usize;
317 let header_size = BlockHeader::SIZE;
318 let total_size = header_size + block_ref.compressed_len as usize;
319
320 let compressed = {
322 let store = self.data.read();
323 if offset + total_size > store.len() {
324 return Err(SochDBError::NotFound("Block not found".into()));
325 }
326
327 let header_bytes = &store[offset..offset + header_size];
329 let _header = BlockHeader::from_bytes(header_bytes)?;
330
331 store[offset + header_size..offset + total_size].to_vec()
333 };
334
335 let checksum = crc32fast::hash(&compressed);
337 if checksum != block_ref.checksum {
338 return Err(SochDBError::Corruption("Block checksum mismatch".into()));
339 }
340
341 self.decompress(
343 &compressed,
344 block_ref.compression,
345 block_ref.original_len as usize,
346 )
347 }
348
349 fn select_compression(&self, data: &[u8]) -> BlockCompression {
351 if data.len() < 128 {
352 return BlockCompression::None; }
354
355 if is_soch_content(data) {
357 BlockCompression::Zstd } else if is_json_content(data) || is_compressible(data) {
359 BlockCompression::Lz4 } else {
361 BlockCompression::None }
363 }
364
365 fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
371 match compression {
372 BlockCompression::None => Ok(data.to_vec()),
373 BlockCompression::Lz4 => {
374 match lz4::block::compress(data, None, false) {
375 Ok(compressed) => {
376 if compressed.len() >= data.len() {
378 Ok(data.to_vec())
379 } else {
380 Ok(compressed)
381 }
382 }
383 Err(_) => {
384 Ok(data.to_vec())
386 }
387 }
388 }
389 BlockCompression::Zstd => {
390 match zstd::encode_all(data, 3) {
392 Ok(compressed) => {
393 if compressed.len() >= data.len() {
395 Ok(data.to_vec())
396 } else {
397 Ok(compressed)
398 }
399 }
400 Err(_) => {
401 Ok(data.to_vec())
403 }
404 }
405 }
406 }
407 }
408
409 fn decompress(
411 &self,
412 data: &[u8],
413 compression: BlockCompression,
414 original_size: usize,
415 ) -> Result<Vec<u8>> {
416 match compression {
417 BlockCompression::None => Ok(data.to_vec()),
418 BlockCompression::Lz4 => {
419 if data.len() == original_size {
421 return Ok(data.to_vec());
422 }
423
424 lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
425 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
426 })
427 }
428 BlockCompression::Zstd => {
429 if data.len() == original_size {
431 return Ok(data.to_vec());
432 }
433
434 zstd::decode_all(data).map_err(|e| {
435 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
436 })
437 }
438 }
439 }
440
441 pub fn add_ref(&self, offset: u64) {
443 let mut refs = self.ref_counts.write();
444 *refs.entry(offset).or_insert(0) += 1;
445 }
446
447 pub fn release_ref(&self, offset: u64) -> bool {
449 let mut refs = self.ref_counts.write();
450 if let Some(count) = refs.get_mut(&offset) {
451 *count = count.saturating_sub(1);
452 return *count == 0;
453 }
454 false
455 }
456
457 pub fn stats(&self) -> BlockStoreStats {
459 let data = self.data.read();
460 let index = self.index.read();
461
462 let mut total_original = 0u64;
463 let mut total_compressed = 0u64;
464
465 for block_ref in index.values() {
466 total_original += block_ref.original_len as u64;
467 total_compressed += block_ref.compressed_len as u64;
468 }
469
470 BlockStoreStats {
471 total_bytes: data.len() as u64,
472 block_count: index.len(),
473 total_original_bytes: total_original,
474 total_compressed_bytes: total_compressed,
475 compression_ratio: if total_compressed > 0 {
476 total_original as f64 / total_compressed as f64
477 } else {
478 1.0
479 },
480 }
481 }
482}
483
484impl Default for BlockStore {
485 fn default() -> Self {
486 Self::new()
487 }
488}
489
490#[derive(Debug, Clone, Default)]
492pub struct BlockStoreStats {
493 pub total_bytes: u64,
495 pub block_count: usize,
497 pub total_original_bytes: u64,
499 pub total_compressed_bytes: u64,
501 pub compression_ratio: f64,
503}
504
505pub fn is_soch_content(data: &[u8]) -> bool {
507 if data.len() < 10 {
509 return false;
510 }
511
512 let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
514 s.contains('[') && s.contains('{') && s.contains(':')
515}
516
517pub fn is_json_content(data: &[u8]) -> bool {
519 if data.is_empty() {
520 return false;
521 }
522
523 let first = data[0];
525 first == b'{' || first == b'['
526}
527
528pub fn is_compressible(data: &[u8]) -> bool {
530 if data.len() < 64 {
531 return false;
532 }
533
534 let sample_size = data.len().min(256);
536 let mut seen = [false; 256];
537 let mut unique = 0;
538
539 for &byte in &data[..sample_size] {
540 if !seen[byte as usize] {
541 seen[byte as usize] = true;
542 unique += 1;
543 }
544 }
545
546 unique < sample_size / 2
548}
549
550pub struct FileBlockManager {
552 store: BlockStore,
554 block_size: usize,
556}
557
558impl FileBlockManager {
559 pub fn new(block_size: usize) -> Self {
561 Self {
562 store: BlockStore::new(),
563 block_size: block_size.min(MAX_BLOCK_SIZE),
564 }
565 }
566
567 pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
569 let mut blocks = Vec::new();
570
571 for chunk in data.chunks(self.block_size) {
572 let block_ref = self.store.write_block(chunk)?;
573 blocks.push(block_ref);
574 }
575
576 Ok(blocks)
577 }
578
579 pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
581 let mut data = Vec::new();
582
583 for block_ref in blocks {
584 let block_data = self.store.read_block(block_ref)?;
585 data.extend(block_data);
586 }
587
588 Ok(data)
589 }
590
591 pub fn stats(&self) -> BlockStoreStats {
593 self.store.stats()
594 }
595}
596
597impl Default for FileBlockManager {
598 fn default() -> Self {
599 Self::new(DEFAULT_BLOCK_SIZE)
600 }
601}
602
603use std::fs::{File, OpenOptions};
608use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
609use std::path::{Path, PathBuf};
610
611#[derive(Debug, Clone, Copy, PartialEq, Eq)]
613#[repr(u8)]
614pub enum WalRecordType {
615 BlockWrite = 1,
617 Checkpoint = 2,
619 Commit = 3,
621 TxnBegin = 4,
623}
624
625impl WalRecordType {
626 fn from_byte(b: u8) -> Option<Self> {
627 match b {
628 1 => Some(WalRecordType::BlockWrite),
629 2 => Some(WalRecordType::Checkpoint),
630 3 => Some(WalRecordType::Commit),
631 4 => Some(WalRecordType::TxnBegin),
632 _ => None,
633 }
634 }
635}
636
637#[derive(Debug, Clone)]
651pub struct WalRecordHeader {
652 pub lsn: u64,
653 pub txn_id: u64,
654 pub record_type: WalRecordType,
655 pub page_id: u64,
656 pub data_len: u32,
657 pub crc32: u32,
658}
659
660impl WalRecordHeader {
661 pub const SIZE: usize = 33;
662
663 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
665 let mut buf = [0u8; Self::SIZE];
666 LittleEndian::write_u64(&mut buf[0..8], self.lsn);
667 LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
668 buf[16] = self.record_type as u8;
669 LittleEndian::write_u64(&mut buf[17..25], self.page_id);
670 LittleEndian::write_u32(&mut buf[25..29], self.data_len);
671 LittleEndian::write_u32(&mut buf[29..33], self.crc32);
672 buf
673 }
674
675 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
677 if buf.len() < Self::SIZE {
678 return Err(SochDBError::Corruption(format!(
679 "WAL record header too short: {} < {}",
680 buf.len(),
681 Self::SIZE
682 )));
683 }
684
685 let record_type = WalRecordType::from_byte(buf[16]).ok_or_else(|| {
686 SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
687 })?;
688
689 Ok(Self {
690 lsn: LittleEndian::read_u64(&buf[0..8]),
691 txn_id: LittleEndian::read_u64(&buf[8..16]),
692 record_type,
693 page_id: LittleEndian::read_u64(&buf[17..25]),
694 data_len: LittleEndian::read_u32(&buf[25..29]),
695 crc32: LittleEndian::read_u32(&buf[29..33]),
696 })
697 }
698
699 pub fn compute_crc32(&self, data: &[u8]) -> u32 {
701 let mut hasher = crc32fast::Hasher::new();
702
703 let mut header_buf = [0u8; 29];
705 LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
706 LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
707 header_buf[16] = self.record_type as u8;
708 LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
709 LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
710
711 hasher.update(&header_buf);
712 hasher.update(data);
713 hasher.finalize()
714 }
715}
716
717pub struct WalWriter {
719 file: BufWriter<File>,
721 next_lsn: u64,
723 #[allow(dead_code)]
725 path: PathBuf,
726}
727
728impl WalWriter {
729 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
731 let path = path.as_ref().to_path_buf();
732
733 let file = OpenOptions::new()
735 .create(true)
736 .read(true)
737 .append(true)
738 .open(&path)?;
739
740 let metadata = file.metadata()?;
742 let next_lsn = metadata.len();
743
744 Ok(Self {
745 file: BufWriter::new(file),
746 next_lsn,
747 path,
748 })
749 }
750
751 pub fn append(
753 &mut self,
754 txn_id: u64,
755 record_type: WalRecordType,
756 page_id: u64,
757 data: &[u8],
758 ) -> Result<u64> {
759 let lsn = self.next_lsn;
760
761 let mut header = WalRecordHeader {
762 lsn,
763 txn_id,
764 record_type,
765 page_id,
766 data_len: data.len() as u32,
767 crc32: 0, };
769
770 header.crc32 = header.compute_crc32(data);
772
773 let header_bytes = header.to_bytes();
775 self.file.write_all(&header_bytes)?;
776
777 self.file.write_all(data)?;
779
780 self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
782
783 Ok(lsn)
784 }
785
786 pub fn sync(&mut self) -> Result<()> {
788 self.file.flush()?;
789 self.file.get_ref().sync_all()?;
790 Ok(())
791 }
792
793 pub fn current_lsn(&self) -> u64 {
795 self.next_lsn
796 }
797}
798
799pub struct WalReader {
801 reader: BufReader<File>,
802 #[allow(dead_code)]
803 path: PathBuf,
804}
805
806impl WalReader {
807 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
809 let path = path.as_ref().to_path_buf();
810 let file = File::open(&path)?;
811
812 Ok(Self {
813 reader: BufReader::new(file),
814 path,
815 })
816 }
817
818 pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
820 let mut header_buf = [0u8; WalRecordHeader::SIZE];
822 match self.reader.read_exact(&mut header_buf) {
823 Ok(()) => {}
824 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
825 Err(e) => return Err(e.into()),
826 }
827
828 let header = WalRecordHeader::from_bytes(&header_buf)?;
829
830 let mut data = vec![0u8; header.data_len as usize];
832 self.reader.read_exact(&mut data)?;
833
834 let computed_crc = header.compute_crc32(&data);
836 if computed_crc != header.crc32 {
837 return Err(SochDBError::Corruption(format!(
838 "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
839 header.lsn, header.crc32, computed_crc
840 )));
841 }
842
843 Ok(Some((header, data)))
844 }
845
846 pub fn iter(&mut self) -> WalIterator<'_> {
848 WalIterator { reader: self }
849 }
850}
851
852pub struct WalIterator<'a> {
854 reader: &'a mut WalReader,
855}
856
857impl<'a> Iterator for WalIterator<'a> {
858 type Item = Result<(WalRecordHeader, Vec<u8>)>;
859
860 fn next(&mut self) -> Option<Self::Item> {
861 match self.reader.read_next() {
862 Ok(Some(record)) => Some(Ok(record)),
863 Ok(None) => None,
864 Err(e) => Some(Err(e)),
865 }
866 }
867}
868
869pub struct DurableBlockStore {
876 store: BlockStore,
878 wal: parking_lot::Mutex<WalWriter>,
880 data_file: parking_lot::Mutex<File>,
882 dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
884 checkpoint_lsn: AtomicU64,
886 data_dir: PathBuf,
888 next_page_id: AtomicU64,
890}
891
892impl DurableBlockStore {
893 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
895 let data_dir = data_dir.as_ref().to_path_buf();
896
897 std::fs::create_dir_all(&data_dir)?;
899
900 let wal_path = data_dir.join("wal.log");
901 let data_path = data_dir.join("blocks.dat");
902
903 let wal = WalWriter::open(&wal_path)?;
905
906 let data_file = OpenOptions::new()
908 .create(true)
909 .read(true)
910 .write(true)
911 .truncate(false)
912 .open(&data_path)?;
913
914 let store = Self {
915 store: BlockStore::new(),
916 wal: parking_lot::Mutex::new(wal),
917 data_file: parking_lot::Mutex::new(data_file),
918 dirty_pages: RwLock::new(HashMap::new()),
919 checkpoint_lsn: AtomicU64::new(0),
920 data_dir,
921 next_page_id: AtomicU64::new(0),
922 };
923
924 Ok(store)
925 }
926
927 pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
933 let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
935
936 {
938 let mut wal = self.wal.lock();
939 wal.append(txn_id, WalRecordType::BlockWrite, page_id, data)?;
940 wal.sync()?; }
942
943 let block_ref = self.store.write_block(data)?;
945
946 self.dirty_pages.write().insert(page_id, data.to_vec());
948
949 Ok(block_ref)
950 }
951
952 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
954 self.store.read_block(block_ref)
955 }
956
957 pub fn commit(&self, txn_id: u64) -> Result<u64> {
959 let mut wal = self.wal.lock();
960 let lsn = wal.append(txn_id, WalRecordType::Commit, 0, &[])?;
961 wal.sync()?; Ok(lsn)
963 }
964
965 pub fn checkpoint(&self) -> Result<u64> {
971 let dirty: Vec<(u64, Vec<u8>)> = {
973 let mut pages = self.dirty_pages.write();
974 pages.drain().collect()
975 };
976
977 {
979 let mut file = self.data_file.lock();
980 for (page_id, data) in &dirty {
981 let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
982 file.seek(SeekFrom::Start(offset))?;
983 file.write_all(data)?;
984 }
985 file.sync_all()?;
986 }
987
988 let lsn = {
990 let mut wal = self.wal.lock();
991 let lsn = wal.append(0, WalRecordType::Checkpoint, 0, &[])?;
992 wal.sync()?;
993 lsn
994 };
995
996 self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
998
999 Ok(lsn)
1000 }
1001
1002 pub fn recover(&mut self) -> Result<RecoveryStats> {
1006 let wal_path = self.data_dir.join("wal.log");
1007
1008 if !wal_path.exists() {
1009 return Ok(RecoveryStats::default());
1010 }
1011
1012 let mut reader = WalReader::open(&wal_path)?;
1013 let mut stats = RecoveryStats::default();
1014
1015 let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1017 let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1018
1019 for record_result in reader.iter() {
1021 let (header, data) = record_result?;
1022 stats.records_read += 1;
1023
1024 match header.record_type {
1025 WalRecordType::BlockWrite => {
1026 pending_txns
1027 .entry(header.txn_id)
1028 .or_default()
1029 .push((header.page_id, data));
1030 }
1031 WalRecordType::Commit => {
1032 committed_txns.insert(header.txn_id);
1033 stats.txns_committed += 1;
1034 }
1035 WalRecordType::Checkpoint => {
1036 self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1037 stats.checkpoints_found += 1;
1038 }
1039 WalRecordType::TxnBegin => {
1040 }
1042 }
1043 }
1044
1045 for txn_id in &committed_txns {
1047 if let Some(writes) = pending_txns.remove(txn_id) {
1048 for (page_id, data) in writes {
1049 self.store.write_block(&data)?;
1050 self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1051 stats.blocks_recovered += 1;
1052 }
1053 }
1054 }
1055
1056 stats.txns_aborted = pending_txns.len();
1058
1059 Ok(stats)
1060 }
1061
1062 pub fn stats(&self) -> DurableBlockStoreStats {
1064 let store_stats = self.store.stats();
1065 DurableBlockStoreStats {
1066 block_stats: store_stats,
1067 dirty_page_count: self.dirty_pages.read().len(),
1068 checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1069 wal_size: self.wal.lock().current_lsn(),
1070 }
1071 }
1072}
1073
1074#[derive(Debug, Clone, Default)]
1076pub struct RecoveryStats {
1077 pub records_read: usize,
1079 pub txns_committed: usize,
1081 pub txns_aborted: usize,
1083 pub blocks_recovered: usize,
1085 pub checkpoints_found: usize,
1087}
1088
1089#[derive(Debug, Clone)]
1091pub struct DurableBlockStoreStats {
1092 pub block_stats: BlockStoreStats,
1094 pub dirty_page_count: usize,
1096 pub checkpoint_lsn: u64,
1098 pub wal_size: u64,
1100}
1101
1102#[cfg(test)]
1103mod tests {
1104 use super::*;
1105
1106 #[test]
1107 fn test_block_store_write_read() {
1108 let store = BlockStore::new();
1109
1110 let data = b"Hello, SochFS block storage!";
1111 let block_ref = store.write_block(data).unwrap();
1112
1113 let read_data = store.read_block(&block_ref).unwrap();
1114 assert_eq!(read_data, data);
1115 }
1116
1117 #[test]
1118 fn test_compression_selection() {
1119 let store = BlockStore::new();
1120
1121 let small = b"hi";
1123 assert_eq!(store.select_compression(small), BlockCompression::None);
1124
1125 let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1127 let compression = store.select_compression(toon);
1129 assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1130
1131 let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1133 let compression = store.select_compression(json);
1135 assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1136 }
1137
1138 #[test]
1139 fn test_lz4_compression() {
1140 let store = BlockStore::new();
1141
1142 let data = "Hello, world! ".repeat(100);
1143 let block_ref = store
1144 .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1145 .unwrap();
1146
1147 let read_data = store.read_block(&block_ref).unwrap();
1151 assert_eq!(read_data, data.as_bytes());
1152 }
1153
1154 #[test]
1155 fn test_zstd_compression() {
1156 let store = BlockStore::new();
1157
1158 let data = "TOON format is very repetitive ".repeat(100);
1159 let block_ref = store
1160 .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1161 .unwrap();
1162
1163 let read_data = store.read_block(&block_ref).unwrap();
1167 assert_eq!(read_data, data.as_bytes());
1168 }
1169
1170 #[test]
1171 fn test_file_block_manager() {
1172 let manager = FileBlockManager::new(1024);
1173
1174 let data = "Test data ".repeat(500); let blocks = manager.write_file(data.as_bytes()).unwrap();
1176
1177 assert!(blocks.len() > 1); let read_data = manager.read_file(&blocks).unwrap();
1180 assert_eq!(read_data, data.as_bytes());
1181 }
1182
1183 #[test]
1184 fn test_stats() {
1185 let store = BlockStore::new();
1186
1187 let data = "Repetitive data pattern ".repeat(100);
1189 store.write_block(data.as_bytes()).unwrap();
1190
1191 let stats = store.stats();
1192 assert_eq!(stats.block_count, 1);
1193 assert!(stats.compression_ratio >= 1.0);
1196 }
1197
1198 #[test]
1203 fn test_block_header_fixed_layout() {
1204 let header = BlockHeader {
1205 magic: BlockHeader::MAGIC,
1206 compression: BlockCompression::Zstd as u8,
1207 original_size: 4096,
1208 compressed_size: 1024,
1209 checksum: 0xDEADBEEF,
1210 };
1211
1212 let bytes = header.to_bytes();
1213
1214 assert_eq!(bytes.len(), 17);
1216
1217 assert_eq!(&bytes[0..4], b"TBLK");
1219
1220 assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1222
1223 assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1225
1226 assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1228
1229 assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1231 }
1232
1233 #[test]
1234 fn test_block_header_roundtrip() {
1235 let original = BlockHeader {
1236 magic: BlockHeader::MAGIC,
1237 compression: BlockCompression::Lz4 as u8,
1238 original_size: 65536,
1239 compressed_size: 32768,
1240 checksum: 0x12345678,
1241 };
1242
1243 let bytes = original.to_bytes();
1244 let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1245
1246 assert_eq!(recovered.compression, original.compression);
1247 assert_eq!(recovered.original_size, original.original_size);
1248 assert_eq!(recovered.compressed_size, original.compressed_size);
1249 assert_eq!(recovered.checksum, original.checksum);
1250 }
1251
1252 #[test]
1253 fn test_block_header_invalid_magic() {
1254 let mut bytes = [0u8; 17];
1255 bytes[0..4].copy_from_slice(b"XXXX"); let result = BlockHeader::from_bytes(&bytes);
1258 assert!(result.is_err());
1259
1260 let err = result.unwrap_err();
1261 assert!(err.to_string().contains("Invalid block magic"));
1262 }
1263
1264 #[test]
1265 fn test_block_header_too_short() {
1266 let bytes = [0u8; 10]; let result = BlockHeader::from_bytes(&bytes);
1269 assert!(result.is_err());
1270 }
1271
1272 #[test]
1277 fn test_block_ref_fixed_layout() {
1278 let block_ref = BlockRef {
1279 store_offset: 0x123456789ABCDEF0,
1280 compressed_len: 4096,
1281 original_len: 8192,
1282 compression: BlockCompression::Zstd,
1283 checksum: 0xCAFEBABE,
1284 };
1285
1286 let bytes = block_ref.to_bytes().unwrap();
1287
1288 assert_eq!(bytes.len(), 21);
1290
1291 assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1293
1294 assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1296
1297 assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1299
1300 assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1302
1303 assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1305 }
1306
1307 #[test]
1308 fn test_block_ref_roundtrip() {
1309 let original = BlockRef {
1310 store_offset: u64::MAX, compressed_len: u32::MAX,
1312 original_len: u32::MAX,
1313 compression: BlockCompression::None,
1314 checksum: u32::MAX,
1315 };
1316
1317 let bytes = original.to_bytes().unwrap();
1318 let recovered = BlockRef::from_bytes(&bytes).unwrap();
1319
1320 assert_eq!(recovered.store_offset, original.store_offset);
1321 assert_eq!(recovered.compressed_len, original.compressed_len);
1322 assert_eq!(recovered.original_len, original.original_len);
1323 assert_eq!(recovered.compression, original.compression);
1324 assert_eq!(recovered.checksum, original.checksum);
1325 }
1326
1327 #[test]
1328 fn test_block_ref_too_short() {
1329 let bytes = [0u8; 10]; let result = BlockRef::from_bytes(&bytes);
1332 assert!(result.is_err());
1333
1334 let err = result.unwrap_err();
1335 assert!(err.to_string().contains("BlockRef too short"));
1336 }
1337
1338 #[test]
1339 fn test_cross_platform_compatibility() {
1340 let block_ref = BlockRef {
1342 store_offset: 0x0102030405060708,
1343 compressed_len: 0x0A0B0C0D,
1344 original_len: 0x0E0F1011,
1345 compression: BlockCompression::Lz4,
1346 checksum: 0x12131415,
1347 };
1348
1349 let bytes = block_ref.to_bytes().unwrap();
1350
1351 assert_eq!(bytes[0], 0x08); assert_eq!(bytes[7], 0x01); assert_eq!(bytes[8], 0x0D); assert_eq!(bytes[17], 0x15); }
1358
1359 #[test]
1364 fn test_lz4_compression_roundtrip() {
1365 let store = BlockStore::new();
1366
1367 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1369
1370 let block_ref = store
1371 .write_block_with_compression(&data, BlockCompression::Lz4)
1372 .unwrap();
1373 let recovered = store.read_block(&block_ref).unwrap();
1374
1375 assert_eq!(recovered, data);
1376 assert!(block_ref.compressed_len <= block_ref.original_len);
1378 }
1379
1380 #[test]
1381 fn test_zstd_compression_roundtrip() {
1382 let store = BlockStore::new();
1383
1384 let data = vec![0u8; 8192];
1386
1387 let block_ref = store
1388 .write_block_with_compression(&data, BlockCompression::Zstd)
1389 .unwrap();
1390 let recovered = store.read_block(&block_ref).unwrap();
1391
1392 assert_eq!(recovered, data);
1393 assert!(block_ref.compressed_len < block_ref.original_len / 2);
1395 }
1396
1397 #[test]
1398 fn test_compression_fallback_on_incompressible() {
1399 let store = BlockStore::new();
1400
1401 let mut data = vec![0u8; 256];
1403 for (i, byte) in data.iter_mut().enumerate().take(256) {
1404 *byte = ((i * 17 + 31) % 256) as u8; }
1406
1407 let block_ref = store
1408 .write_block_with_compression(&data, BlockCompression::Lz4)
1409 .unwrap();
1410 let recovered = store.read_block(&block_ref).unwrap();
1411
1412 assert_eq!(recovered, data);
1413 }
1414
1415 #[test]
1416 fn test_automatic_compression_selection() {
1417 let store = BlockStore::new();
1418
1419 let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1421 let json_ref = store.write_block(&json_data).unwrap();
1422 let json_recovered = store.read_block(&json_ref).unwrap();
1423 assert_eq!(json_recovered, json_data);
1424
1425 let mut soch_data = vec![0u8; 256];
1427 soch_data[0..4].copy_from_slice(b"TOON"); let soch_ref = store.write_block(&soch_data).unwrap();
1429 let soch_recovered = store.read_block(&soch_ref).unwrap();
1430 assert_eq!(soch_recovered, soch_data);
1431 }
1432
1433 #[test]
1434 fn test_small_data_no_compression() {
1435 let store = BlockStore::new();
1436
1437 let small_data = vec![42u8; 64];
1439 let block_ref = store.write_block(&small_data).unwrap();
1440
1441 assert_eq!(block_ref.compression, BlockCompression::None);
1443
1444 let recovered = store.read_block(&block_ref).unwrap();
1445 assert_eq!(recovered, small_data);
1446 }
1447
1448 #[test]
1449 fn test_compression_stats() {
1450 let store = BlockStore::new();
1451
1452 let data = vec![0u8; 4096];
1454 store
1455 .write_block_with_compression(&data, BlockCompression::Zstd)
1456 .unwrap();
1457
1458 let stats = store.stats();
1459 assert_eq!(stats.block_count, 1);
1460 assert!(
1461 stats.compression_ratio > 1.0,
1462 "Compression should reduce size"
1463 );
1464 assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1465 }
1466
1467 #[test]
1472 fn test_wal_record_header_roundtrip() {
1473 let original = WalRecordHeader {
1474 lsn: 12345,
1475 txn_id: 67890,
1476 record_type: WalRecordType::BlockWrite,
1477 page_id: 42,
1478 data_len: 4096,
1479 crc32: 0xDEADBEEF,
1480 };
1481
1482 let bytes = original.to_bytes();
1483 let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1484
1485 assert_eq!(recovered.lsn, original.lsn);
1486 assert_eq!(recovered.txn_id, original.txn_id);
1487 assert_eq!(recovered.record_type, original.record_type);
1488 assert_eq!(recovered.page_id, original.page_id);
1489 assert_eq!(recovered.data_len, original.data_len);
1490 assert_eq!(recovered.crc32, original.crc32);
1491 }
1492
1493 #[test]
1494 fn test_wal_crc32() {
1495 let header = WalRecordHeader {
1496 lsn: 100,
1497 txn_id: 1,
1498 record_type: WalRecordType::BlockWrite,
1499 page_id: 0,
1500 data_len: 4,
1501 crc32: 0,
1502 };
1503
1504 let data = b"test";
1505 let crc1 = header.compute_crc32(data);
1506 let crc2 = header.compute_crc32(data);
1507
1508 assert_eq!(crc1, crc2, "CRC should be deterministic");
1509
1510 let different_data = b"TEST";
1512 let crc3 = header.compute_crc32(different_data);
1513 assert_ne!(crc1, crc3, "Different data should have different CRC");
1514 }
1515
1516 #[test]
1517 fn test_durable_block_store_basic() {
1518 let dir = tempfile::tempdir().unwrap();
1519
1520 let store = DurableBlockStore::open(dir.path()).unwrap();
1521
1522 let data = b"Hello, durable block store!";
1524 let block_ref = store.write_block(1, data).unwrap();
1525
1526 let read_data = store.read_block(&block_ref).unwrap();
1528 assert_eq!(read_data, data);
1529
1530 store.commit(1).unwrap();
1532
1533 let stats = store.stats();
1535 assert_eq!(stats.dirty_page_count, 1);
1536 }
1537
1538 #[test]
1539 fn test_durable_block_store_checkpoint() {
1540 let dir = tempfile::tempdir().unwrap();
1541
1542 let store = DurableBlockStore::open(dir.path()).unwrap();
1543
1544 store.write_block(1, b"block1").unwrap();
1546 store.write_block(1, b"block2").unwrap();
1547 store.write_block(1, b"block3").unwrap();
1548 store.commit(1).unwrap();
1549
1550 let checkpoint_lsn = store.checkpoint().unwrap();
1552 assert!(checkpoint_lsn > 0);
1553
1554 let stats = store.stats();
1556 assert_eq!(stats.dirty_page_count, 0);
1557 assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1558 }
1559
1560 #[test]
1561 fn test_durable_block_store_recovery() {
1562 let dir = tempfile::tempdir().unwrap();
1563
1564 {
1566 let store = DurableBlockStore::open(dir.path()).unwrap();
1567 store.write_block(1, b"data1").unwrap();
1568 store.write_block(1, b"data2").unwrap();
1569 store.commit(1).unwrap();
1570
1571 }
1573
1574 {
1576 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1577 let stats = store.recover().unwrap();
1578
1579 assert_eq!(stats.txns_committed, 1);
1581 assert_eq!(stats.blocks_recovered, 2);
1582 assert_eq!(stats.txns_aborted, 0);
1583 }
1584 }
1585
1586 #[test]
1587 fn test_durable_block_store_uncommitted_recovery() {
1588 let dir = tempfile::tempdir().unwrap();
1589
1590 {
1592 let store = DurableBlockStore::open(dir.path()).unwrap();
1593 store.write_block(1, b"uncommitted_data").unwrap();
1594 }
1596
1597 {
1599 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1600 let stats = store.recover().unwrap();
1601
1602 assert_eq!(stats.txns_committed, 0);
1604 assert_eq!(stats.txns_aborted, 1);
1605 assert_eq!(stats.blocks_recovered, 0);
1606 }
1607 }
1608
1609 #[test]
1610 fn test_wal_writer_reader_roundtrip() {
1611 let dir = tempfile::tempdir().unwrap();
1612 let wal_path = dir.path().join("test.wal");
1613
1614 {
1616 let mut writer = WalWriter::open(&wal_path).unwrap();
1617 writer.append(1, WalRecordType::TxnBegin, 0, &[]).unwrap();
1618 writer
1619 .append(1, WalRecordType::BlockWrite, 0, b"data1")
1620 .unwrap();
1621 writer
1622 .append(1, WalRecordType::BlockWrite, 1, b"data2")
1623 .unwrap();
1624 writer.append(1, WalRecordType::Commit, 0, &[]).unwrap();
1625 writer.sync().unwrap();
1626 }
1627
1628 {
1630 let mut reader = WalReader::open(&wal_path).unwrap();
1631 let mut records = Vec::new();
1632 for record in reader.iter() {
1633 records.push(record.unwrap());
1634 }
1635
1636 assert_eq!(records.len(), 4);
1637 assert_eq!(records[0].0.record_type, WalRecordType::TxnBegin);
1638 assert_eq!(records[1].0.record_type, WalRecordType::BlockWrite);
1639 assert_eq!(records[1].1, b"data1");
1640 assert_eq!(records[2].0.record_type, WalRecordType::BlockWrite);
1641 assert_eq!(records[2].1, b"data2");
1642 assert_eq!(records[3].0.record_type, WalRecordType::Commit);
1643 }
1644 }
1645}