1use byteorder::{ByteOrder, LittleEndian};
60use parking_lot::RwLock;
61use serde::{Deserialize, Serialize};
62use std::collections::HashMap;
63use std::sync::atomic::{AtomicU64, Ordering};
64
65use crate::{Result, SochDBError};
66
67pub const DEFAULT_BLOCK_SIZE: usize = 4096;
69
70pub const MAX_BLOCK_SIZE: usize = 1024 * 1024;
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
75#[repr(u8)]
76pub enum BlockCompression {
77 None = 0,
79 Lz4 = 1,
81 Zstd = 2,
83}
84
85impl BlockCompression {
86 pub fn from_byte(b: u8) -> Self {
87 match b {
88 1 => BlockCompression::Lz4,
89 2 => BlockCompression::Zstd,
90 _ => BlockCompression::None,
91 }
92 }
93
94 pub fn to_byte(&self) -> u8 {
95 *self as u8
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct BlockRef {
102 pub store_offset: u64,
104 pub compressed_len: u32,
106 pub original_len: u32,
108 pub compression: BlockCompression,
110 pub checksum: u32,
112}
113
114impl BlockRef {
117 pub const SERIALIZED_SIZE: usize = 21;
119
120 pub fn to_bytes(&self) -> Result<[u8; Self::SERIALIZED_SIZE]> {
122 let mut buf = [0u8; Self::SERIALIZED_SIZE];
123 LittleEndian::write_u64(&mut buf[0..8], self.store_offset);
124 LittleEndian::write_u32(&mut buf[8..12], self.compressed_len);
125 LittleEndian::write_u32(&mut buf[12..16], self.original_len);
126 buf[16] = self.compression as u8;
127 LittleEndian::write_u32(&mut buf[17..21], self.checksum);
128 Ok(buf)
129 }
130
131 pub fn from_bytes(data: &[u8]) -> Result<Self> {
133 if data.len() < Self::SERIALIZED_SIZE {
134 return Err(SochDBError::Serialization(format!(
135 "BlockRef too short: {} < {}",
136 data.len(),
137 Self::SERIALIZED_SIZE
138 )));
139 }
140
141 Ok(Self {
142 store_offset: LittleEndian::read_u64(&data[0..8]),
143 compressed_len: LittleEndian::read_u32(&data[8..12]),
144 original_len: LittleEndian::read_u32(&data[12..16]),
145 compression: BlockCompression::from_byte(data[16]),
146 checksum: LittleEndian::read_u32(&data[17..21]),
147 })
148 }
149}
150
151#[derive(Debug, Clone)]
164pub struct BlockHeader {
165 pub magic: [u8; 4],
167 pub compression: u8,
169 pub original_size: u32,
171 pub compressed_size: u32,
173 pub checksum: u32,
175}
176
177impl BlockHeader {
178 pub const MAGIC: [u8; 4] = *b"TBLK";
179 pub const SIZE: usize = 17; pub fn to_bytes(&self) -> [u8; Self::SIZE] {
183 let mut buf = [0u8; Self::SIZE];
184 buf[0..4].copy_from_slice(&Self::MAGIC);
185 buf[4] = self.compression;
186 LittleEndian::write_u32(&mut buf[5..9], self.original_size);
187 LittleEndian::write_u32(&mut buf[9..13], self.compressed_size);
188 LittleEndian::write_u32(&mut buf[13..17], self.checksum);
189 buf
190 }
191
192 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
194 if buf.len() < Self::SIZE {
195 return Err(SochDBError::Corruption(format!(
196 "BlockHeader too short: {} < {}",
197 buf.len(),
198 Self::SIZE
199 )));
200 }
201
202 if buf[0..4] != Self::MAGIC {
203 return Err(SochDBError::Corruption(format!(
204 "Invalid block magic: expected {:?}, got {:?}",
205 Self::MAGIC,
206 &buf[0..4]
207 )));
208 }
209
210 Ok(Self {
211 magic: Self::MAGIC,
212 compression: buf[4],
213 original_size: LittleEndian::read_u32(&buf[5..9]),
214 compressed_size: LittleEndian::read_u32(&buf[9..13]),
215 checksum: LittleEndian::read_u32(&buf[13..17]),
216 })
217 }
218}
219
220pub struct BlockStore {
222 data: RwLock<Vec<u8>>,
224 next_offset: AtomicU64,
226 index: RwLock<HashMap<u64, BlockRef>>,
228 ref_counts: RwLock<HashMap<u64, u32>>,
230}
231
232impl BlockStore {
233 pub fn new() -> Self {
235 Self {
236 data: RwLock::new(Vec::new()),
237 next_offset: AtomicU64::new(0),
238 index: RwLock::new(HashMap::new()),
239 ref_counts: RwLock::new(HashMap::new()),
240 }
241 }
242
243 pub fn write_block(&self, data: &[u8]) -> Result<BlockRef> {
247 let compression = self.select_compression(data);
248 self.write_block_with_compression(data, compression)
249 }
250
251 pub fn write_block_with_compression(
253 &self,
254 data: &[u8],
255 compression: BlockCompression,
256 ) -> Result<BlockRef> {
257 let compressed = self.compress(data, compression)?;
259
260 let checksum = crc32fast::hash(&compressed);
262
263 let header_size = BlockHeader::SIZE;
265 let total_size = header_size + compressed.len();
266 let offset = self
267 .next_offset
268 .fetch_add(total_size as u64, Ordering::SeqCst);
269
270 let header = BlockHeader {
272 magic: BlockHeader::MAGIC,
273 compression: compression as u8,
274 original_size: data.len() as u32,
275 compressed_size: compressed.len() as u32,
276 checksum,
277 };
278
279 {
281 let mut store = self.data.write();
282 store.resize((offset + total_size as u64) as usize, 0);
283
284 let header_bytes = header.to_bytes();
286 store[offset as usize..offset as usize + header_size].copy_from_slice(&header_bytes);
287
288 store[offset as usize + header_size..offset as usize + total_size]
290 .copy_from_slice(&compressed);
291 }
292
293 let block_ref = BlockRef {
295 store_offset: offset,
296 compressed_len: compressed.len() as u32,
297 original_len: data.len() as u32,
298 compression,
299 checksum,
300 };
301
302 self.index.write().insert(offset, block_ref.clone());
304
305 self.ref_counts.write().insert(offset, 1);
307
308 Ok(block_ref)
309 }
310
311 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
313 let offset = block_ref.store_offset as usize;
314 let header_size = BlockHeader::SIZE;
315 let total_size = header_size + block_ref.compressed_len as usize;
316
317 let compressed = {
319 let store = self.data.read();
320 if offset + total_size > store.len() {
321 return Err(SochDBError::NotFound("Block not found".into()));
322 }
323
324 let header_bytes = &store[offset..offset + header_size];
326 let _header = BlockHeader::from_bytes(header_bytes)?;
327
328 store[offset + header_size..offset + total_size].to_vec()
330 };
331
332 let checksum = crc32fast::hash(&compressed);
334 if checksum != block_ref.checksum {
335 return Err(SochDBError::Corruption("Block checksum mismatch".into()));
336 }
337
338 self.decompress(
340 &compressed,
341 block_ref.compression,
342 block_ref.original_len as usize,
343 )
344 }
345
346 fn select_compression(&self, data: &[u8]) -> BlockCompression {
348 if data.len() < 128 {
349 return BlockCompression::None; }
351
352 if is_soch_content(data) {
354 BlockCompression::Zstd } else if is_json_content(data) || is_compressible(data) {
356 BlockCompression::Lz4 } else {
358 BlockCompression::None }
360 }
361
362 fn compress(&self, data: &[u8], compression: BlockCompression) -> Result<Vec<u8>> {
368 match compression {
369 BlockCompression::None => Ok(data.to_vec()),
370 BlockCompression::Lz4 => {
371 match lz4::block::compress(data, None, false) {
372 Ok(compressed) => {
373 if compressed.len() >= data.len() {
375 Ok(data.to_vec())
376 } else {
377 Ok(compressed)
378 }
379 }
380 Err(_) => {
381 Ok(data.to_vec())
383 }
384 }
385 }
386 BlockCompression::Zstd => {
387 match zstd::encode_all(data, 3) {
389 Ok(compressed) => {
390 if compressed.len() >= data.len() {
392 Ok(data.to_vec())
393 } else {
394 Ok(compressed)
395 }
396 }
397 Err(_) => {
398 Ok(data.to_vec())
400 }
401 }
402 }
403 }
404 }
405
406 fn decompress(
408 &self,
409 data: &[u8],
410 compression: BlockCompression,
411 original_size: usize,
412 ) -> Result<Vec<u8>> {
413 match compression {
414 BlockCompression::None => Ok(data.to_vec()),
415 BlockCompression::Lz4 => {
416 if data.len() == original_size {
418 return Ok(data.to_vec());
419 }
420
421 lz4::block::decompress(data, Some(original_size as i32)).map_err(|e| {
422 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
423 })
424 }
425 BlockCompression::Zstd => {
426 if data.len() == original_size {
428 return Ok(data.to_vec());
429 }
430
431 zstd::decode_all(data).map_err(|e| {
432 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
433 })
434 }
435 }
436 }
437
438 pub fn add_ref(&self, offset: u64) {
440 let mut refs = self.ref_counts.write();
441 *refs.entry(offset).or_insert(0) += 1;
442 }
443
444 pub fn release_ref(&self, offset: u64) -> bool {
446 let mut refs = self.ref_counts.write();
447 if let Some(count) = refs.get_mut(&offset) {
448 *count = count.saturating_sub(1);
449 return *count == 0;
450 }
451 false
452 }
453
454 pub fn stats(&self) -> BlockStoreStats {
456 let data = self.data.read();
457 let index = self.index.read();
458
459 let mut total_original = 0u64;
460 let mut total_compressed = 0u64;
461
462 for block_ref in index.values() {
463 total_original += block_ref.original_len as u64;
464 total_compressed += block_ref.compressed_len as u64;
465 }
466
467 BlockStoreStats {
468 total_bytes: data.len() as u64,
469 block_count: index.len(),
470 total_original_bytes: total_original,
471 total_compressed_bytes: total_compressed,
472 compression_ratio: if total_compressed > 0 {
473 total_original as f64 / total_compressed as f64
474 } else {
475 1.0
476 },
477 }
478 }
479}
480
481impl Default for BlockStore {
482 fn default() -> Self {
483 Self::new()
484 }
485}
486
487#[derive(Debug, Clone, Default)]
489pub struct BlockStoreStats {
490 pub total_bytes: u64,
492 pub block_count: usize,
494 pub total_original_bytes: u64,
496 pub total_compressed_bytes: u64,
498 pub compression_ratio: f64,
500}
501
502pub fn is_soch_content(data: &[u8]) -> bool {
504 if data.len() < 10 {
506 return false;
507 }
508
509 let s = String::from_utf8_lossy(&data[..data.len().min(100)]);
511 s.contains('[') && s.contains('{') && s.contains(':')
512}
513
514pub fn is_json_content(data: &[u8]) -> bool {
516 if data.is_empty() {
517 return false;
518 }
519
520 let first = data[0];
522 first == b'{' || first == b'['
523}
524
525pub fn is_compressible(data: &[u8]) -> bool {
527 if data.len() < 64 {
528 return false;
529 }
530
531 let sample_size = data.len().min(256);
533 let mut seen = [false; 256];
534 let mut unique = 0;
535
536 for &byte in &data[..sample_size] {
537 if !seen[byte as usize] {
538 seen[byte as usize] = true;
539 unique += 1;
540 }
541 }
542
543 unique < sample_size / 2
545}
546
547pub struct FileBlockManager {
549 store: BlockStore,
551 block_size: usize,
553}
554
555impl FileBlockManager {
556 pub fn new(block_size: usize) -> Self {
558 Self {
559 store: BlockStore::new(),
560 block_size: block_size.min(MAX_BLOCK_SIZE),
561 }
562 }
563
564 pub fn write_file(&self, data: &[u8]) -> Result<Vec<BlockRef>> {
566 let mut blocks = Vec::new();
567
568 for chunk in data.chunks(self.block_size) {
569 let block_ref = self.store.write_block(chunk)?;
570 blocks.push(block_ref);
571 }
572
573 Ok(blocks)
574 }
575
576 pub fn read_file(&self, blocks: &[BlockRef]) -> Result<Vec<u8>> {
578 let mut data = Vec::new();
579
580 for block_ref in blocks {
581 let block_data = self.store.read_block(block_ref)?;
582 data.extend(block_data);
583 }
584
585 Ok(data)
586 }
587
588 pub fn stats(&self) -> BlockStoreStats {
590 self.store.stats()
591 }
592}
593
594impl Default for FileBlockManager {
595 fn default() -> Self {
596 Self::new(DEFAULT_BLOCK_SIZE)
597 }
598}
599
600use std::fs::{File, OpenOptions};
605use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
606use std::path::{Path, PathBuf};
607
608#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610#[repr(u8)]
611pub enum WalRecordType {
612 BlockWrite = 1,
614 Checkpoint = 2,
616 Commit = 3,
618 TxnBegin = 4,
620}
621
622impl WalRecordType {
623 fn from_byte(b: u8) -> Option<Self> {
624 match b {
625 1 => Some(WalRecordType::BlockWrite),
626 2 => Some(WalRecordType::Checkpoint),
627 3 => Some(WalRecordType::Commit),
628 4 => Some(WalRecordType::TxnBegin),
629 _ => None,
630 }
631 }
632}
633
634#[derive(Debug, Clone)]
648pub struct WalRecordHeader {
649 pub lsn: u64,
650 pub txn_id: u64,
651 pub record_type: WalRecordType,
652 pub page_id: u64,
653 pub data_len: u32,
654 pub crc32: u32,
655}
656
657impl WalRecordHeader {
658 pub const SIZE: usize = 33;
659
660 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
662 let mut buf = [0u8; Self::SIZE];
663 LittleEndian::write_u64(&mut buf[0..8], self.lsn);
664 LittleEndian::write_u64(&mut buf[8..16], self.txn_id);
665 buf[16] = self.record_type as u8;
666 LittleEndian::write_u64(&mut buf[17..25], self.page_id);
667 LittleEndian::write_u32(&mut buf[25..29], self.data_len);
668 LittleEndian::write_u32(&mut buf[29..33], self.crc32);
669 buf
670 }
671
672 pub fn from_bytes(buf: &[u8]) -> Result<Self> {
674 if buf.len() < Self::SIZE {
675 return Err(SochDBError::Corruption(format!(
676 "WAL record header too short: {} < {}",
677 buf.len(),
678 Self::SIZE
679 )));
680 }
681
682 let record_type = WalRecordType::from_byte(buf[16]).ok_or_else(|| {
683 SochDBError::Corruption(format!("Invalid WAL record type: {}", buf[16]))
684 })?;
685
686 Ok(Self {
687 lsn: LittleEndian::read_u64(&buf[0..8]),
688 txn_id: LittleEndian::read_u64(&buf[8..16]),
689 record_type,
690 page_id: LittleEndian::read_u64(&buf[17..25]),
691 data_len: LittleEndian::read_u32(&buf[25..29]),
692 crc32: LittleEndian::read_u32(&buf[29..33]),
693 })
694 }
695
696 pub fn compute_crc32(&self, data: &[u8]) -> u32 {
698 let mut hasher = crc32fast::Hasher::new();
699
700 let mut header_buf = [0u8; 29];
702 LittleEndian::write_u64(&mut header_buf[0..8], self.lsn);
703 LittleEndian::write_u64(&mut header_buf[8..16], self.txn_id);
704 header_buf[16] = self.record_type as u8;
705 LittleEndian::write_u64(&mut header_buf[17..25], self.page_id);
706 LittleEndian::write_u32(&mut header_buf[25..29], self.data_len);
707
708 hasher.update(&header_buf);
709 hasher.update(data);
710 hasher.finalize()
711 }
712}
713
714pub struct WalWriter {
716 file: BufWriter<File>,
718 next_lsn: u64,
720 #[allow(dead_code)]
722 path: PathBuf,
723}
724
725impl WalWriter {
726 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
728 let path = path.as_ref().to_path_buf();
729
730 let file = OpenOptions::new()
732 .create(true)
733 .read(true)
734 .append(true)
735 .open(&path)?;
736
737 let metadata = file.metadata()?;
739 let next_lsn = metadata.len();
740
741 Ok(Self {
742 file: BufWriter::new(file),
743 next_lsn,
744 path,
745 })
746 }
747
748 pub fn append(
750 &mut self,
751 txn_id: u64,
752 record_type: WalRecordType,
753 page_id: u64,
754 data: &[u8],
755 ) -> Result<u64> {
756 let lsn = self.next_lsn;
757
758 let mut header = WalRecordHeader {
759 lsn,
760 txn_id,
761 record_type,
762 page_id,
763 data_len: data.len() as u32,
764 crc32: 0, };
766
767 header.crc32 = header.compute_crc32(data);
769
770 let header_bytes = header.to_bytes();
772 self.file.write_all(&header_bytes)?;
773
774 self.file.write_all(data)?;
776
777 self.next_lsn += WalRecordHeader::SIZE as u64 + data.len() as u64;
779
780 Ok(lsn)
781 }
782
783 pub fn sync(&mut self) -> Result<()> {
785 self.file.flush()?;
786 self.file.get_ref().sync_all()?;
787 Ok(())
788 }
789
790 pub fn current_lsn(&self) -> u64 {
792 self.next_lsn
793 }
794}
795
796pub struct WalReader {
798 reader: BufReader<File>,
799 #[allow(dead_code)]
800 path: PathBuf,
801}
802
803impl WalReader {
804 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
806 let path = path.as_ref().to_path_buf();
807 let file = File::open(&path)?;
808
809 Ok(Self {
810 reader: BufReader::new(file),
811 path,
812 })
813 }
814
815 pub fn read_next(&mut self) -> Result<Option<(WalRecordHeader, Vec<u8>)>> {
817 let mut header_buf = [0u8; WalRecordHeader::SIZE];
819 match self.reader.read_exact(&mut header_buf) {
820 Ok(()) => {}
821 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
822 Err(e) => return Err(e.into()),
823 }
824
825 let header = WalRecordHeader::from_bytes(&header_buf)?;
826
827 let mut data = vec![0u8; header.data_len as usize];
829 self.reader.read_exact(&mut data)?;
830
831 let computed_crc = header.compute_crc32(&data);
833 if computed_crc != header.crc32 {
834 return Err(SochDBError::Corruption(format!(
835 "WAL CRC mismatch at LSN {}: expected {:#x}, got {:#x}",
836 header.lsn, header.crc32, computed_crc
837 )));
838 }
839
840 Ok(Some((header, data)))
841 }
842
843 pub fn iter(&mut self) -> WalIterator<'_> {
845 WalIterator { reader: self }
846 }
847}
848
849pub struct WalIterator<'a> {
851 reader: &'a mut WalReader,
852}
853
854impl<'a> Iterator for WalIterator<'a> {
855 type Item = Result<(WalRecordHeader, Vec<u8>)>;
856
857 fn next(&mut self) -> Option<Self::Item> {
858 match self.reader.read_next() {
859 Ok(Some(record)) => Some(Ok(record)),
860 Ok(None) => None,
861 Err(e) => Some(Err(e)),
862 }
863 }
864}
865
866pub struct DurableBlockStore {
873 store: BlockStore,
875 wal: parking_lot::Mutex<WalWriter>,
877 data_file: parking_lot::Mutex<File>,
879 dirty_pages: RwLock<HashMap<u64, Vec<u8>>>,
881 checkpoint_lsn: AtomicU64,
883 data_dir: PathBuf,
885 next_page_id: AtomicU64,
887}
888
889impl DurableBlockStore {
890 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
892 let data_dir = data_dir.as_ref().to_path_buf();
893
894 std::fs::create_dir_all(&data_dir)?;
896
897 let wal_path = data_dir.join("wal.log");
898 let data_path = data_dir.join("blocks.dat");
899
900 let wal = WalWriter::open(&wal_path)?;
902
903 let data_file = OpenOptions::new()
905 .create(true)
906 .read(true)
907 .write(true)
908 .truncate(false)
909 .open(&data_path)?;
910
911 let store = Self {
912 store: BlockStore::new(),
913 wal: parking_lot::Mutex::new(wal),
914 data_file: parking_lot::Mutex::new(data_file),
915 dirty_pages: RwLock::new(HashMap::new()),
916 checkpoint_lsn: AtomicU64::new(0),
917 data_dir,
918 next_page_id: AtomicU64::new(0),
919 };
920
921 Ok(store)
922 }
923
924 pub fn write_block(&self, txn_id: u64, data: &[u8]) -> Result<BlockRef> {
930 let page_id = self.next_page_id.fetch_add(1, Ordering::SeqCst);
932
933 {
935 let mut wal = self.wal.lock();
936 wal.append(txn_id, WalRecordType::BlockWrite, page_id, data)?;
937 wal.sync()?; }
939
940 let block_ref = self.store.write_block(data)?;
942
943 self.dirty_pages.write().insert(page_id, data.to_vec());
945
946 Ok(block_ref)
947 }
948
949 pub fn read_block(&self, block_ref: &BlockRef) -> Result<Vec<u8>> {
951 self.store.read_block(block_ref)
952 }
953
954 pub fn commit(&self, txn_id: u64) -> Result<u64> {
956 let mut wal = self.wal.lock();
957 let lsn = wal.append(txn_id, WalRecordType::Commit, 0, &[])?;
958 wal.sync()?; Ok(lsn)
960 }
961
962 pub fn checkpoint(&self) -> Result<u64> {
968 let dirty: Vec<(u64, Vec<u8>)> = {
970 let mut pages = self.dirty_pages.write();
971 pages.drain().collect()
972 };
973
974 {
976 let mut file = self.data_file.lock();
977 for (page_id, data) in &dirty {
978 let offset = *page_id * (DEFAULT_BLOCK_SIZE as u64 + BlockHeader::SIZE as u64);
979 file.seek(SeekFrom::Start(offset))?;
980 file.write_all(data)?;
981 }
982 file.sync_all()?;
983 }
984
985 let lsn = {
987 let mut wal = self.wal.lock();
988 let lsn = wal.append(0, WalRecordType::Checkpoint, 0, &[])?;
989 wal.sync()?;
990 lsn
991 };
992
993 self.checkpoint_lsn.store(lsn, Ordering::SeqCst);
995
996 Ok(lsn)
997 }
998
999 pub fn recover(&mut self) -> Result<RecoveryStats> {
1003 let wal_path = self.data_dir.join("wal.log");
1004
1005 if !wal_path.exists() {
1006 return Ok(RecoveryStats::default());
1007 }
1008
1009 let mut reader = WalReader::open(&wal_path)?;
1010 let mut stats = RecoveryStats::default();
1011
1012 let mut pending_txns: HashMap<u64, Vec<(u64, Vec<u8>)>> = HashMap::new();
1014 let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
1015
1016 for record_result in reader.iter() {
1018 let (header, data) = record_result?;
1019 stats.records_read += 1;
1020
1021 match header.record_type {
1022 WalRecordType::BlockWrite => {
1023 pending_txns
1024 .entry(header.txn_id)
1025 .or_default()
1026 .push((header.page_id, data));
1027 }
1028 WalRecordType::Commit => {
1029 committed_txns.insert(header.txn_id);
1030 stats.txns_committed += 1;
1031 }
1032 WalRecordType::Checkpoint => {
1033 self.checkpoint_lsn.store(header.lsn, Ordering::SeqCst);
1034 stats.checkpoints_found += 1;
1035 }
1036 WalRecordType::TxnBegin => {
1037 }
1039 }
1040 }
1041
1042 for txn_id in &committed_txns {
1044 if let Some(writes) = pending_txns.remove(txn_id) {
1045 for (page_id, data) in writes {
1046 self.store.write_block(&data)?;
1047 self.next_page_id.fetch_max(page_id + 1, Ordering::SeqCst);
1048 stats.blocks_recovered += 1;
1049 }
1050 }
1051 }
1052
1053 stats.txns_aborted = pending_txns.len();
1055
1056 Ok(stats)
1057 }
1058
1059 pub fn stats(&self) -> DurableBlockStoreStats {
1061 let store_stats = self.store.stats();
1062 DurableBlockStoreStats {
1063 block_stats: store_stats,
1064 dirty_page_count: self.dirty_pages.read().len(),
1065 checkpoint_lsn: self.checkpoint_lsn.load(Ordering::SeqCst),
1066 wal_size: self.wal.lock().current_lsn(),
1067 }
1068 }
1069}
1070
1071#[derive(Debug, Clone, Default)]
1073pub struct RecoveryStats {
1074 pub records_read: usize,
1076 pub txns_committed: usize,
1078 pub txns_aborted: usize,
1080 pub blocks_recovered: usize,
1082 pub checkpoints_found: usize,
1084}
1085
1086#[derive(Debug, Clone)]
1088pub struct DurableBlockStoreStats {
1089 pub block_stats: BlockStoreStats,
1091 pub dirty_page_count: usize,
1093 pub checkpoint_lsn: u64,
1095 pub wal_size: u64,
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use super::*;
1102
1103 #[test]
1104 fn test_block_store_write_read() {
1105 let store = BlockStore::new();
1106
1107 let data = b"Hello, SochFS block storage!";
1108 let block_ref = store.write_block(data).unwrap();
1109
1110 let read_data = store.read_block(&block_ref).unwrap();
1111 assert_eq!(read_data, data);
1112 }
1113
1114 #[test]
1115 fn test_compression_selection() {
1116 let store = BlockStore::new();
1117
1118 let small = b"hi";
1120 assert_eq!(store.select_compression(small), BlockCompression::None);
1121
1122 let toon = b"users[5]{id,name}:\n1,Alice\n2,Bob\n3,Charlie";
1124 let compression = store.select_compression(toon);
1126 assert!(compression == BlockCompression::Zstd || compression == BlockCompression::None);
1127
1128 let json = br#"{"users": [{"id": 1, "name": "Alice"}]}"#;
1130 let compression = store.select_compression(json);
1132 assert!(compression == BlockCompression::Lz4 || compression == BlockCompression::None);
1133 }
1134
1135 #[test]
1136 fn test_lz4_compression() {
1137 let store = BlockStore::new();
1138
1139 let data = "Hello, world! ".repeat(100);
1140 let block_ref = store
1141 .write_block_with_compression(data.as_bytes(), BlockCompression::Lz4)
1142 .unwrap();
1143
1144 let read_data = store.read_block(&block_ref).unwrap();
1148 assert_eq!(read_data, data.as_bytes());
1149 }
1150
1151 #[test]
1152 fn test_zstd_compression() {
1153 let store = BlockStore::new();
1154
1155 let data = "TOON format is very repetitive ".repeat(100);
1156 let block_ref = store
1157 .write_block_with_compression(data.as_bytes(), BlockCompression::Zstd)
1158 .unwrap();
1159
1160 let read_data = store.read_block(&block_ref).unwrap();
1164 assert_eq!(read_data, data.as_bytes());
1165 }
1166
1167 #[test]
1168 fn test_file_block_manager() {
1169 let manager = FileBlockManager::new(1024);
1170
1171 let data = "Test data ".repeat(500); let blocks = manager.write_file(data.as_bytes()).unwrap();
1173
1174 assert!(blocks.len() > 1); let read_data = manager.read_file(&blocks).unwrap();
1177 assert_eq!(read_data, data.as_bytes());
1178 }
1179
1180 #[test]
1181 fn test_stats() {
1182 let store = BlockStore::new();
1183
1184 let data = "Repetitive data pattern ".repeat(100);
1186 store.write_block(data.as_bytes()).unwrap();
1187
1188 let stats = store.stats();
1189 assert_eq!(stats.block_count, 1);
1190 assert!(stats.compression_ratio >= 1.0);
1193 }
1194
1195 #[test]
1200 fn test_block_header_fixed_layout() {
1201 let header = BlockHeader {
1202 magic: BlockHeader::MAGIC,
1203 compression: BlockCompression::Zstd as u8,
1204 original_size: 4096,
1205 compressed_size: 1024,
1206 checksum: 0xDEADBEEF,
1207 };
1208
1209 let bytes = header.to_bytes();
1210
1211 assert_eq!(bytes.len(), 17);
1213
1214 assert_eq!(&bytes[0..4], b"TBLK");
1216
1217 assert_eq!(bytes[4], BlockCompression::Zstd as u8);
1219
1220 assert_eq!(LittleEndian::read_u32(&bytes[5..9]), 4096);
1222
1223 assert_eq!(LittleEndian::read_u32(&bytes[9..13]), 1024);
1225
1226 assert_eq!(LittleEndian::read_u32(&bytes[13..17]), 0xDEADBEEF);
1228 }
1229
1230 #[test]
1231 fn test_block_header_roundtrip() {
1232 let original = BlockHeader {
1233 magic: BlockHeader::MAGIC,
1234 compression: BlockCompression::Lz4 as u8,
1235 original_size: 65536,
1236 compressed_size: 32768,
1237 checksum: 0x12345678,
1238 };
1239
1240 let bytes = original.to_bytes();
1241 let recovered = BlockHeader::from_bytes(&bytes).unwrap();
1242
1243 assert_eq!(recovered.compression, original.compression);
1244 assert_eq!(recovered.original_size, original.original_size);
1245 assert_eq!(recovered.compressed_size, original.compressed_size);
1246 assert_eq!(recovered.checksum, original.checksum);
1247 }
1248
1249 #[test]
1250 fn test_block_header_invalid_magic() {
1251 let mut bytes = [0u8; 17];
1252 bytes[0..4].copy_from_slice(b"XXXX"); let result = BlockHeader::from_bytes(&bytes);
1255 assert!(result.is_err());
1256
1257 let err = result.unwrap_err();
1258 assert!(err.to_string().contains("Invalid block magic"));
1259 }
1260
1261 #[test]
1262 fn test_block_header_too_short() {
1263 let bytes = [0u8; 10]; let result = BlockHeader::from_bytes(&bytes);
1266 assert!(result.is_err());
1267 }
1268
1269 #[test]
1274 fn test_block_ref_fixed_layout() {
1275 let block_ref = BlockRef {
1276 store_offset: 0x123456789ABCDEF0,
1277 compressed_len: 4096,
1278 original_len: 8192,
1279 compression: BlockCompression::Zstd,
1280 checksum: 0xCAFEBABE,
1281 };
1282
1283 let bytes = block_ref.to_bytes().unwrap();
1284
1285 assert_eq!(bytes.len(), 21);
1287
1288 assert_eq!(LittleEndian::read_u64(&bytes[0..8]), 0x123456789ABCDEF0);
1290
1291 assert_eq!(LittleEndian::read_u32(&bytes[8..12]), 4096);
1293
1294 assert_eq!(LittleEndian::read_u32(&bytes[12..16]), 8192);
1296
1297 assert_eq!(bytes[16], BlockCompression::Zstd as u8);
1299
1300 assert_eq!(LittleEndian::read_u32(&bytes[17..21]), 0xCAFEBABE);
1302 }
1303
1304 #[test]
1305 fn test_block_ref_roundtrip() {
1306 let original = BlockRef {
1307 store_offset: u64::MAX, compressed_len: u32::MAX,
1309 original_len: u32::MAX,
1310 compression: BlockCompression::None,
1311 checksum: u32::MAX,
1312 };
1313
1314 let bytes = original.to_bytes().unwrap();
1315 let recovered = BlockRef::from_bytes(&bytes).unwrap();
1316
1317 assert_eq!(recovered.store_offset, original.store_offset);
1318 assert_eq!(recovered.compressed_len, original.compressed_len);
1319 assert_eq!(recovered.original_len, original.original_len);
1320 assert_eq!(recovered.compression, original.compression);
1321 assert_eq!(recovered.checksum, original.checksum);
1322 }
1323
1324 #[test]
1325 fn test_block_ref_too_short() {
1326 let bytes = [0u8; 10]; let result = BlockRef::from_bytes(&bytes);
1329 assert!(result.is_err());
1330
1331 let err = result.unwrap_err();
1332 assert!(err.to_string().contains("BlockRef too short"));
1333 }
1334
1335 #[test]
1336 fn test_cross_platform_compatibility() {
1337 let block_ref = BlockRef {
1339 store_offset: 0x0102030405060708,
1340 compressed_len: 0x0A0B0C0D,
1341 original_len: 0x0E0F1011,
1342 compression: BlockCompression::Lz4,
1343 checksum: 0x12131415,
1344 };
1345
1346 let bytes = block_ref.to_bytes().unwrap();
1347
1348 assert_eq!(bytes[0], 0x08); assert_eq!(bytes[7], 0x01); assert_eq!(bytes[8], 0x0D); assert_eq!(bytes[17], 0x15); }
1355
1356 #[test]
1361 fn test_lz4_compression_roundtrip() {
1362 let store = BlockStore::new();
1363
1364 let data: Vec<u8> = (0..4096).map(|i| (i % 256) as u8).collect();
1366
1367 let block_ref = store
1368 .write_block_with_compression(&data, BlockCompression::Lz4)
1369 .unwrap();
1370 let recovered = store.read_block(&block_ref).unwrap();
1371
1372 assert_eq!(recovered, data);
1373 assert!(block_ref.compressed_len <= block_ref.original_len);
1375 }
1376
1377 #[test]
1378 fn test_zstd_compression_roundtrip() {
1379 let store = BlockStore::new();
1380
1381 let data = vec![0u8; 8192];
1383
1384 let block_ref = store
1385 .write_block_with_compression(&data, BlockCompression::Zstd)
1386 .unwrap();
1387 let recovered = store.read_block(&block_ref).unwrap();
1388
1389 assert_eq!(recovered, data);
1390 assert!(block_ref.compressed_len < block_ref.original_len / 2);
1392 }
1393
1394 #[test]
1395 fn test_compression_fallback_on_incompressible() {
1396 let store = BlockStore::new();
1397
1398 let mut data = vec![0u8; 256];
1400 for (i, byte) in data.iter_mut().enumerate().take(256) {
1401 *byte = ((i * 17 + 31) % 256) as u8; }
1403
1404 let block_ref = store
1405 .write_block_with_compression(&data, BlockCompression::Lz4)
1406 .unwrap();
1407 let recovered = store.read_block(&block_ref).unwrap();
1408
1409 assert_eq!(recovered, data);
1410 }
1411
1412 #[test]
1413 fn test_automatic_compression_selection() {
1414 let store = BlockStore::new();
1415
1416 let json_data = br#"{"name": "test", "value": 123, "items": [1, 2, 3]}"#.repeat(10);
1418 let json_ref = store.write_block(&json_data).unwrap();
1419 let json_recovered = store.read_block(&json_ref).unwrap();
1420 assert_eq!(json_recovered, json_data);
1421
1422 let mut soch_data = vec![0u8; 256];
1424 soch_data[0..4].copy_from_slice(b"TOON"); let soch_ref = store.write_block(&soch_data).unwrap();
1426 let soch_recovered = store.read_block(&soch_ref).unwrap();
1427 assert_eq!(soch_recovered, soch_data);
1428 }
1429
1430 #[test]
1431 fn test_small_data_no_compression() {
1432 let store = BlockStore::new();
1433
1434 let small_data = vec![42u8; 64];
1436 let block_ref = store.write_block(&small_data).unwrap();
1437
1438 assert_eq!(block_ref.compression, BlockCompression::None);
1440
1441 let recovered = store.read_block(&block_ref).unwrap();
1442 assert_eq!(recovered, small_data);
1443 }
1444
1445 #[test]
1446 fn test_compression_stats() {
1447 let store = BlockStore::new();
1448
1449 let data = vec![0u8; 4096];
1451 store
1452 .write_block_with_compression(&data, BlockCompression::Zstd)
1453 .unwrap();
1454
1455 let stats = store.stats();
1456 assert_eq!(stats.block_count, 1);
1457 assert!(
1458 stats.compression_ratio > 1.0,
1459 "Compression should reduce size"
1460 );
1461 assert!(stats.total_original_bytes > stats.total_compressed_bytes);
1462 }
1463
1464 #[test]
1469 fn test_wal_record_header_roundtrip() {
1470 let original = WalRecordHeader {
1471 lsn: 12345,
1472 txn_id: 67890,
1473 record_type: WalRecordType::BlockWrite,
1474 page_id: 42,
1475 data_len: 4096,
1476 crc32: 0xDEADBEEF,
1477 };
1478
1479 let bytes = original.to_bytes();
1480 let recovered = WalRecordHeader::from_bytes(&bytes).unwrap();
1481
1482 assert_eq!(recovered.lsn, original.lsn);
1483 assert_eq!(recovered.txn_id, original.txn_id);
1484 assert_eq!(recovered.record_type, original.record_type);
1485 assert_eq!(recovered.page_id, original.page_id);
1486 assert_eq!(recovered.data_len, original.data_len);
1487 assert_eq!(recovered.crc32, original.crc32);
1488 }
1489
1490 #[test]
1491 fn test_wal_crc32() {
1492 let header = WalRecordHeader {
1493 lsn: 100,
1494 txn_id: 1,
1495 record_type: WalRecordType::BlockWrite,
1496 page_id: 0,
1497 data_len: 4,
1498 crc32: 0,
1499 };
1500
1501 let data = b"test";
1502 let crc1 = header.compute_crc32(data);
1503 let crc2 = header.compute_crc32(data);
1504
1505 assert_eq!(crc1, crc2, "CRC should be deterministic");
1506
1507 let different_data = b"TEST";
1509 let crc3 = header.compute_crc32(different_data);
1510 assert_ne!(crc1, crc3, "Different data should have different CRC");
1511 }
1512
1513 #[test]
1514 fn test_durable_block_store_basic() {
1515 let dir = tempfile::tempdir().unwrap();
1516
1517 let store = DurableBlockStore::open(dir.path()).unwrap();
1518
1519 let data = b"Hello, durable block store!";
1521 let block_ref = store.write_block(1, data).unwrap();
1522
1523 let read_data = store.read_block(&block_ref).unwrap();
1525 assert_eq!(read_data, data);
1526
1527 store.commit(1).unwrap();
1529
1530 let stats = store.stats();
1532 assert_eq!(stats.dirty_page_count, 1);
1533 }
1534
1535 #[test]
1536 fn test_durable_block_store_checkpoint() {
1537 let dir = tempfile::tempdir().unwrap();
1538
1539 let store = DurableBlockStore::open(dir.path()).unwrap();
1540
1541 store.write_block(1, b"block1").unwrap();
1543 store.write_block(1, b"block2").unwrap();
1544 store.write_block(1, b"block3").unwrap();
1545 store.commit(1).unwrap();
1546
1547 let checkpoint_lsn = store.checkpoint().unwrap();
1549 assert!(checkpoint_lsn > 0);
1550
1551 let stats = store.stats();
1553 assert_eq!(stats.dirty_page_count, 0);
1554 assert_eq!(stats.checkpoint_lsn, checkpoint_lsn);
1555 }
1556
1557 #[test]
1558 fn test_durable_block_store_recovery() {
1559 let dir = tempfile::tempdir().unwrap();
1560
1561 {
1563 let store = DurableBlockStore::open(dir.path()).unwrap();
1564 store.write_block(1, b"data1").unwrap();
1565 store.write_block(1, b"data2").unwrap();
1566 store.commit(1).unwrap();
1567
1568 }
1570
1571 {
1573 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1574 let stats = store.recover().unwrap();
1575
1576 assert_eq!(stats.txns_committed, 1);
1578 assert_eq!(stats.blocks_recovered, 2);
1579 assert_eq!(stats.txns_aborted, 0);
1580 }
1581 }
1582
1583 #[test]
1584 fn test_durable_block_store_uncommitted_recovery() {
1585 let dir = tempfile::tempdir().unwrap();
1586
1587 {
1589 let store = DurableBlockStore::open(dir.path()).unwrap();
1590 store.write_block(1, b"uncommitted_data").unwrap();
1591 }
1593
1594 {
1596 let mut store = DurableBlockStore::open(dir.path()).unwrap();
1597 let stats = store.recover().unwrap();
1598
1599 assert_eq!(stats.txns_committed, 0);
1601 assert_eq!(stats.txns_aborted, 1);
1602 assert_eq!(stats.blocks_recovered, 0);
1603 }
1604 }
1605
1606 #[test]
1607 fn test_wal_writer_reader_roundtrip() {
1608 let dir = tempfile::tempdir().unwrap();
1609 let wal_path = dir.path().join("test.wal");
1610
1611 {
1613 let mut writer = WalWriter::open(&wal_path).unwrap();
1614 writer.append(1, WalRecordType::TxnBegin, 0, &[]).unwrap();
1615 writer
1616 .append(1, WalRecordType::BlockWrite, 0, b"data1")
1617 .unwrap();
1618 writer
1619 .append(1, WalRecordType::BlockWrite, 1, b"data2")
1620 .unwrap();
1621 writer.append(1, WalRecordType::Commit, 0, &[]).unwrap();
1622 writer.sync().unwrap();
1623 }
1624
1625 {
1627 let mut reader = WalReader::open(&wal_path).unwrap();
1628 let mut records = Vec::new();
1629 for record in reader.iter() {
1630 records.push(record.unwrap());
1631 }
1632
1633 assert_eq!(records.len(), 4);
1634 assert_eq!(records[0].0.record_type, WalRecordType::TxnBegin);
1635 assert_eq!(records[1].0.record_type, WalRecordType::BlockWrite);
1636 assert_eq!(records[1].1, b"data1");
1637 assert_eq!(records[2].0.record_type, WalRecordType::BlockWrite);
1638 assert_eq!(records[2].1, b"data2");
1639 assert_eq!(records[3].0.record_type, WalRecordType::Commit);
1640 }
1641 }
1642}