1use byteorder::{LittleEndian, ReadBytesExt};
49use parking_lot::Mutex;
50use std::cell::Cell;
51use std::collections::HashSet;
52use std::fs::{File, OpenOptions};
53use std::io::{BufReader, BufWriter, Read, Write};
54use std::path::{Path, PathBuf};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{Instant, SystemTime, UNIX_EPOCH};
57use sochdb_core::{Result, SochDBError, WalRecordType};
58
59const CACHE_VALIDITY_NS: u64 = 1_000_000;
65
66thread_local! {
67 static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
70}
71
72#[inline(always)]
89pub fn cached_timestamp_us() -> u64 {
90 TS_CACHE.with(|cache| {
91 let (instant, ts) = cache.get();
92 let elapsed_ns = instant.elapsed().as_nanos() as u64;
93
94 if elapsed_ns < CACHE_VALIDITY_NS {
95 ts + elapsed_ns / 1000
98 } else {
99 let new_ts = SystemTime::now()
101 .duration_since(UNIX_EPOCH)
102 .unwrap()
103 .as_micros() as u64;
104 cache.set((Instant::now(), new_ts));
105 new_ts
106 }
107 })
108}
109
110const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; const CHECKSUM_SIZE: usize = 4;
115
116const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
118
119#[derive(Debug)]
156pub struct TxnWalBuffer {
157 txn_id: u64,
159 buffer: Vec<u8>,
161 entry_count: usize,
163}
164
165impl TxnWalBuffer {
166 #[inline]
168 pub fn new(txn_id: u64) -> Self {
169 Self {
170 txn_id,
171 buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
172 entry_count: 0,
173 }
174 }
175
176 #[inline]
178 pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
179 Self {
180 txn_id,
181 buffer: Vec::with_capacity(capacity),
182 entry_count: 0,
183 }
184 }
185
186 #[inline]
193 pub fn append(&mut self, key: &[u8], value: &[u8]) {
194 let timestamp_us = cached_timestamp_us();
196
197 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
198 let entry_start = self.buffer.len();
199
200 self.buffer.extend_from_slice(&[0u8; 4]);
202
203 let mut hasher = crc32fast::Hasher::new();
204
205 let record_type_byte = WalRecordType::Data as u8;
207 self.buffer.push(record_type_byte);
208 hasher.update(&[record_type_byte]);
209
210 let txn_bytes = self.txn_id.to_le_bytes();
212 self.buffer.extend_from_slice(&txn_bytes);
213 hasher.update(&txn_bytes);
214
215 let ts_bytes = timestamp_us.to_le_bytes();
217 self.buffer.extend_from_slice(&ts_bytes);
218 hasher.update(&ts_bytes);
219
220 let key_len_bytes = (key.len() as u32).to_le_bytes();
222 self.buffer.extend_from_slice(&key_len_bytes);
223 hasher.update(&key_len_bytes);
224
225 let val_len_bytes = (value.len() as u32).to_le_bytes();
227 self.buffer.extend_from_slice(&val_len_bytes);
228 hasher.update(&val_len_bytes);
229
230 self.buffer.extend_from_slice(key);
232 hasher.update(key);
233
234 self.buffer.extend_from_slice(value);
236 hasher.update(value);
237
238 self.buffer
240 .extend_from_slice(&hasher.finalize().to_le_bytes());
241
242 let content_len = (total_len - 4) as u32;
244 self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
245
246 self.entry_count += 1;
247 }
248
249 #[inline]
257 pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
258 wal.flush_buffer(self)
259 }
260
261 #[inline]
263 pub fn clear(&mut self) {
264 self.buffer.clear();
265 self.entry_count = 0;
266 }
267
268 #[inline]
270 pub fn entry_count(&self) -> usize {
271 self.entry_count
272 }
273
274 #[inline]
276 pub fn bytes_buffered(&self) -> usize {
277 self.buffer.len()
278 }
279
280 #[inline]
282 pub fn is_empty(&self) -> bool {
283 self.buffer.is_empty()
284 }
285}
286
287#[derive(Debug, Clone)]
289pub struct TxnWalEntry {
290 pub record_type: WalRecordType,
292 pub txn_id: u64,
294 pub timestamp_us: u64,
296 pub key: Vec<u8>,
298 pub value: Vec<u8>,
300}
301
302impl TxnWalEntry {
303 pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
305 Self {
306 record_type: WalRecordType::Data,
307 txn_id,
308 timestamp_us: Self::now_us(),
309 key,
310 value,
311 }
312 }
313
314 pub fn txn_begin(txn_id: u64) -> Self {
316 Self {
317 record_type: WalRecordType::TxnBegin,
318 txn_id,
319 timestamp_us: Self::now_us(),
320 key: Vec::new(),
321 value: Vec::new(),
322 }
323 }
324
325 pub fn txn_commit(txn_id: u64) -> Self {
327 Self {
328 record_type: WalRecordType::TxnCommit,
329 txn_id,
330 timestamp_us: Self::now_us(),
331 key: Vec::new(),
332 value: Vec::new(),
333 }
334 }
335
336 pub fn txn_abort(txn_id: u64) -> Self {
338 Self {
339 record_type: WalRecordType::TxnAbort,
340 txn_id,
341 timestamp_us: Self::now_us(),
342 key: Vec::new(),
343 value: Vec::new(),
344 }
345 }
346
347 pub fn checkpoint(txn_id: u64) -> Self {
349 Self {
350 record_type: WalRecordType::Checkpoint,
351 txn_id,
352 timestamp_us: Self::now_us(),
353 key: Vec::new(),
354 value: Vec::new(),
355 }
356 }
357
358 pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
360 Self {
361 record_type: WalRecordType::SchemaChange,
362 txn_id,
363 timestamp_us: Self::now_us(),
364 key: Vec::new(),
365 value: schema_data,
366 }
367 }
368
369 #[inline]
371 fn now_us() -> u64 {
372 cached_timestamp_us()
373 }
374
375 pub fn checksum(&self) -> u32 {
381 let mut hasher = crc32fast::Hasher::new();
382 hasher.update(&[self.record_type as u8]);
383 hasher.update(&self.txn_id.to_le_bytes());
384 hasher.update(&self.timestamp_us.to_le_bytes());
385 hasher.update(&(self.key.len() as u32).to_le_bytes());
386 hasher.update(&(self.value.len() as u32).to_le_bytes());
387 hasher.update(&self.key);
388 hasher.update(&self.value);
389 hasher.finalize()
390 }
391
392 pub fn to_bytes(&self) -> Vec<u8> {
397 let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
398 let mut buf = Vec::with_capacity(total_len);
399 let mut hasher = crc32fast::Hasher::new();
400
401 let content_len = (total_len - 4) as u32;
403 buf.extend_from_slice(&content_len.to_le_bytes());
404
405 let record_type_byte = self.record_type as u8;
407 buf.push(record_type_byte);
408 hasher.update(&[record_type_byte]);
409
410 let txn_bytes = self.txn_id.to_le_bytes();
412 buf.extend_from_slice(&txn_bytes);
413 hasher.update(&txn_bytes);
414
415 let ts_bytes = self.timestamp_us.to_le_bytes();
417 buf.extend_from_slice(&ts_bytes);
418 hasher.update(&ts_bytes);
419
420 let key_len_bytes = (self.key.len() as u32).to_le_bytes();
422 buf.extend_from_slice(&key_len_bytes);
423 hasher.update(&key_len_bytes);
424
425 let val_len_bytes = (self.value.len() as u32).to_le_bytes();
427 buf.extend_from_slice(&val_len_bytes);
428 hasher.update(&val_len_bytes);
429
430 buf.extend_from_slice(&self.key);
432 hasher.update(&self.key);
433
434 buf.extend_from_slice(&self.value);
436 hasher.update(&self.value);
437
438 buf.extend_from_slice(&hasher.finalize().to_le_bytes());
440
441 buf
442 }
443
444 pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
451 let content_len = reader.read_u32::<LittleEndian>()?;
453 if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
454 return Err(SochDBError::Corruption("WAL entry too short".into()));
455 }
456
457 let record_type_byte = reader.read_u8()?;
459 let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
460 SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
461 })?;
462
463 let txn_id = reader.read_u64::<LittleEndian>()?;
465
466 let timestamp_us = reader.read_u64::<LittleEndian>()?;
468
469 let key_len = reader.read_u32::<LittleEndian>()? as usize;
471
472 let value_len = reader.read_u32::<LittleEndian>()? as usize;
474
475 let mut key = vec![0u8; key_len];
477 reader.read_exact(&mut key)?;
478
479 let mut value = vec![0u8; value_len];
481 reader.read_exact(&mut value)?;
482
483 let stored_checksum = reader.read_u32::<LittleEndian>()?;
485
486 let entry = Self {
487 record_type,
488 txn_id,
489 timestamp_us,
490 key,
491 value,
492 };
493
494 if entry.checksum() != stored_checksum {
496 return Err(SochDBError::Corruption(format!(
497 "WAL checksum mismatch for txn_id {}: expected {}, got {}",
498 txn_id,
499 entry.checksum(),
500 stored_checksum
501 )));
502 }
503
504 Ok(entry)
505 }
506}
507
508pub struct TxnWal {
510 path: PathBuf,
512 writer: Mutex<BufWriter<File>>,
514 next_txn_id: AtomicU64,
516 sequence: AtomicU64,
518 bytes_since_sync: AtomicU64,
520 cached_timestamp_us: AtomicU64,
523}
524
525impl TxnWal {
526 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
528 let path = path.as_ref().to_path_buf();
529
530 if let Some(parent) = path.parent() {
532 std::fs::create_dir_all(parent)?;
533 }
534
535 let file = OpenOptions::new()
536 .create(true)
537 .append(true)
538 .read(true)
539 .open(&path)?;
540
541 let now_us = cached_timestamp_us();
544
545 let wal = Self {
546 path,
547 writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
548 next_txn_id: AtomicU64::new(1),
549 sequence: AtomicU64::new(0),
550 bytes_since_sync: AtomicU64::new(0),
551 cached_timestamp_us: AtomicU64::new(now_us),
552 };
553
554 wal.recover_state()?;
556
557 Ok(wal)
558 }
559
560 fn recover_state(&self) -> Result<()> {
562 let file = File::open(&self.path)?;
563 let mut reader = BufReader::new(file);
564 let mut max_txn_id: u64 = 0;
565 let mut count: u64 = 0;
566
567 loop {
568 match TxnWalEntry::from_reader(&mut reader) {
569 Ok(entry) => {
570 if entry.txn_id > max_txn_id {
571 max_txn_id = entry.txn_id;
572 }
573 count += 1;
574 }
575 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
576 break;
577 }
578 Err(_) => {
579 break;
581 }
582 }
583 }
584
585 self.next_txn_id.store(max_txn_id + 1, Ordering::SeqCst);
586 self.sequence.store(count, Ordering::SeqCst);
587
588 Ok(())
589 }
590
591 #[inline]
596 fn get_cached_timestamp(&self) -> u64 {
597 let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
599
600 let seq = self.sequence.load(Ordering::Relaxed);
603 if seq & 0x3FF == 0 {
604 let now_us = cached_timestamp_us();
606 self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
607 return now_us;
608 }
609
610 cached
611 }
612
613 pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
619 let bytes = entry.to_bytes();
620 let mut writer = self.writer.lock();
621
622 writer.write_all(&bytes)?;
623 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
627 self.bytes_since_sync
628 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
629
630 Ok(seq)
631 }
632
633 #[inline]
637 pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
638 let bytes = entry.to_bytes();
639 let mut writer = self.writer.lock();
640
641 writer.write_all(&bytes)?;
642 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
645 self.bytes_since_sync
646 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
647
648 Ok(seq)
649 }
650
651 #[inline]
653 pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
654 let entry = TxnWalEntry::data(txn_id, key, value);
655 self.append_no_flush(&entry)
656 }
657
658 #[inline]
663 pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
664 let timestamp_us = self.get_cached_timestamp();
666
667 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
668 let mut hasher = crc32fast::Hasher::new();
669
670 let mut writer = self.writer.lock();
671
672 let content_len = (total_len - 4) as u32;
674 writer.write_all(&content_len.to_le_bytes())?;
675
676 let record_type_byte = WalRecordType::Data as u8;
678 writer.write_all(&[record_type_byte])?;
679 hasher.update(&[record_type_byte]);
680
681 let txn_bytes = txn_id.to_le_bytes();
683 writer.write_all(&txn_bytes)?;
684 hasher.update(&txn_bytes);
685
686 let ts_bytes = timestamp_us.to_le_bytes();
688 writer.write_all(&ts_bytes)?;
689 hasher.update(&ts_bytes);
690
691 let key_len_bytes = (key.len() as u32).to_le_bytes();
693 writer.write_all(&key_len_bytes)?;
694 hasher.update(&key_len_bytes);
695
696 let val_len_bytes = (value.len() as u32).to_le_bytes();
698 writer.write_all(&val_len_bytes)?;
699 hasher.update(&val_len_bytes);
700
701 writer.write_all(key)?;
703 hasher.update(key);
704
705 writer.write_all(value)?;
707 hasher.update(value);
708
709 writer.write_all(&hasher.finalize().to_le_bytes())?;
711
712 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
713 self.bytes_since_sync
714 .fetch_add(total_len as u64, Ordering::Relaxed);
715
716 Ok(seq)
717 }
718
719 pub fn flush(&self) -> Result<()> {
721 let mut writer = self.writer.lock();
722 writer.flush()?;
723 Ok(())
724 }
725
726 pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
728 let seq = self.append(entry)?;
729 self.sync()?;
730 Ok(seq)
731 }
732
733 pub fn sync(&self) -> Result<()> {
735 let writer = self.writer.lock();
736 writer.get_ref().sync_all()?;
737 self.bytes_since_sync.store(0, Ordering::Relaxed);
738 Ok(())
739 }
740
741 #[inline]
755 pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
756 if buffer.is_empty() {
757 return Ok(0);
758 }
759
760 let mut writer = self.writer.lock();
761 writer.write_all(&buffer.buffer)?;
762
763 let seq = self
764 .sequence
765 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
766 self.bytes_since_sync
767 .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
768
769 Ok(seq)
770 }
771
772 pub fn size_bytes(&self) -> u64 {
774 std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
775 }
776
777 pub fn alloc_txn_id(&self) -> u64 {
779 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
780 }
781
782 pub fn begin_transaction(&self) -> Result<u64> {
784 let txn_id = self.alloc_txn_id();
785 let entry = TxnWalEntry::txn_begin(txn_id);
786 self.append(&entry)?;
787 Ok(txn_id)
788 }
789
790 pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
794 self.flush()?;
796
797 let entry = TxnWalEntry::txn_commit(txn_id);
799 self.append_sync(&entry)?;
800 Ok(())
801 }
802
803 pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
805 let entry = TxnWalEntry::txn_abort(txn_id);
806 self.append(&entry)?;
807 Ok(())
808 }
809
810 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
812 let entry = TxnWalEntry::data(txn_id, key, value);
813 self.append(&entry)
814 }
815
816 #[allow(clippy::type_complexity)]
820 pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
821 let file = File::open(&self.path)?;
822 let mut reader = BufReader::new(file);
823
824 let mut committed_txns: HashSet<u64> = HashSet::new();
825 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
826 std::collections::HashMap::new();
827
828 loop {
830 match TxnWalEntry::from_reader(&mut reader) {
831 Ok(entry) => match entry.record_type {
832 WalRecordType::TxnBegin => {
833 pending_writes.insert(entry.txn_id, Vec::new());
834 }
835 WalRecordType::Data => {
836 if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
837 writes.push((entry.key, entry.value));
838 }
839 }
840 WalRecordType::TxnCommit => {
841 committed_txns.insert(entry.txn_id);
842 }
843 WalRecordType::TxnAbort => {
844 pending_writes.remove(&entry.txn_id);
845 }
846 _ => {}
847 },
848 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
849 break;
850 }
851 Err(_) => {
852 break;
853 }
854 }
855 }
856
857 let mut result = Vec::new();
859 let mut txn_count = 0;
860
861 for (txn_id, writes) in pending_writes {
862 if committed_txns.contains(&txn_id) {
863 result.extend(writes);
864 txn_count += 1;
865 }
866 }
868
869 Ok((result, txn_count))
870 }
871
872 pub fn replay<F>(&self, mut callback: F) -> Result<u64>
874 where
875 F: FnMut(TxnWalEntry) -> Result<()>,
876 {
877 let file = File::open(&self.path)?;
878 let mut reader = BufReader::new(file);
879 let mut count = 0u64;
880
881 loop {
882 match TxnWalEntry::from_reader(&mut reader) {
883 Ok(entry) => {
884 callback(entry)?;
885 count += 1;
886 }
887 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
888 break;
889 }
890 Err(e) => {
891 eprintln!("WAL replay warning: {:?}", e);
893 break;
894 }
895 }
896 }
897
898 Ok(count)
899 }
900
901 pub fn truncate(&self) -> Result<()> {
903 let writer = self.writer.lock();
904 let file = writer.get_ref();
905 file.set_len(0)?;
906 file.sync_all()?;
907 self.sequence.store(0, Ordering::SeqCst);
908 self.bytes_since_sync.store(0, Ordering::Relaxed);
909 Ok(())
910 }
911
912 pub fn write_checkpoint(&self) -> Result<u64> {
914 let entry = TxnWalEntry::checkpoint(0);
915 self.append_sync(&entry)
916 }
917
918 pub fn append_clr(
924 &self,
925 txn_id: u64,
926 _original_lsn: u64,
927 undo_next_lsn: Option<u64>,
928 undo_data: &[u8],
929 ) -> Result<u64> {
930 let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
932 let entry = TxnWalEntry {
933 record_type: WalRecordType::CompensationLogRecord,
934 txn_id,
935 timestamp_us: TxnWalEntry::now_us(),
936 key, value: undo_data.to_vec(),
938 };
939 self.append(&entry)
940 }
941
942 pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
944 let entry = TxnWalEntry {
945 record_type: WalRecordType::Checkpoint,
946 txn_id: 0,
947 timestamp_us: TxnWalEntry::now_us(),
948 key: Vec::new(),
949 value: checkpoint_data.to_vec(),
950 };
951 self.append_sync(&entry)
952 }
953
954 pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
956 let entry = TxnWalEntry {
957 record_type: WalRecordType::CheckpointEnd,
958 txn_id: 0,
959 timestamp_us: TxnWalEntry::now_us(),
960 key: Vec::new(),
961 value: checkpoint_data.to_vec(),
962 };
963 self.append_sync(&entry)
964 }
965
966 pub fn sequence(&self) -> u64 {
968 self.sequence.load(Ordering::SeqCst)
969 }
970
971 pub fn bytes_since_sync(&self) -> u64 {
973 self.bytes_since_sync.load(Ordering::Relaxed)
974 }
975
976 pub fn path(&self) -> &Path {
978 &self.path
979 }
980}
981
982#[derive(Debug, Clone, Default)]
984pub struct TxnWalStats {
985 pub entries_written: u64,
987 pub bytes_since_sync: u64,
989 pub next_txn_id: u64,
991}
992
993#[allow(dead_code)]
1006pub struct ShardedWal {
1007 shards: Vec<parking_lot::Mutex<WalShard>>,
1009 num_shards: usize,
1011 central_writer: parking_lot::Mutex<BufWriter<File>>,
1013 next_txn_id: AtomicU64,
1015 sequence: AtomicU64,
1017 path: PathBuf,
1019}
1020
1021struct WalShard {
1023 buffer: Vec<u8>,
1025 entry_count: usize,
1027}
1028
1029impl WalShard {
1030 fn new() -> Self {
1031 Self {
1032 buffer: Vec::with_capacity(64 * 1024), entry_count: 0,
1034 }
1035 }
1036
1037 fn append(&mut self, entry: &TxnWalEntry) {
1038 let bytes = entry.to_bytes();
1039 self.buffer.extend_from_slice(&bytes);
1040 self.entry_count += 1;
1041 }
1042
1043 fn is_empty(&self) -> bool {
1044 self.buffer.is_empty()
1045 }
1046
1047 fn drain(&mut self) -> Vec<u8> {
1048 self.entry_count = 0;
1049 std::mem::take(&mut self.buffer)
1050 }
1051}
1052
1053impl ShardedWal {
1054 pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1058 let path = path.as_ref().to_path_buf();
1059
1060 if let Some(parent) = path.parent() {
1061 std::fs::create_dir_all(parent)?;
1062 }
1063
1064 let file = std::fs::OpenOptions::new()
1065 .create(true)
1066 .append(true)
1067 .read(true)
1068 .open(&path)?;
1069
1070 let num_shards = num_shards.next_power_of_two();
1072 let shards: Vec<_> = (0..num_shards)
1073 .map(|_| parking_lot::Mutex::new(WalShard::new()))
1074 .collect();
1075
1076 Ok(Self {
1077 shards,
1078 num_shards,
1079 central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1080 next_txn_id: AtomicU64::new(1),
1081 sequence: AtomicU64::new(0),
1082 path,
1083 })
1084 }
1085
1086 #[inline]
1088 fn shard_idx(&self, txn_id: u64) -> usize {
1089 (txn_id as usize) & (self.num_shards - 1)
1090 }
1091
1092 pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1094 let shard_idx = self.shard_idx(entry.txn_id);
1095 let mut shard = self.shards[shard_idx].lock();
1096 shard.append(entry);
1097 self.sequence.fetch_add(1, Ordering::SeqCst)
1098 }
1099
1100 pub fn alloc_txn_id(&self) -> u64 {
1102 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1103 }
1104
1105 pub fn flush(&self) -> Result<()> {
1107 let mut central = self.central_writer.lock();
1108
1109 for shard in &self.shards {
1111 let mut shard_guard = shard.lock();
1112 if !shard_guard.is_empty() {
1113 let data = shard_guard.drain();
1114 central.write_all(&data)?;
1115 }
1116 }
1117
1118 central.flush()?;
1119 Ok(())
1120 }
1121
1122 pub fn sync(&self) -> Result<()> {
1124 self.flush()?;
1125 let central = self.central_writer.lock();
1126 central.get_ref().sync_all()?;
1127 Ok(())
1128 }
1129
1130 pub fn begin_transaction(&self) -> Result<u64> {
1132 let txn_id = self.alloc_txn_id();
1133 let entry = TxnWalEntry::txn_begin(txn_id);
1134 self.append(&entry);
1135 Ok(txn_id)
1136 }
1137
1138 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1140 let entry = TxnWalEntry::data(txn_id, key, value);
1141 Ok(self.append(&entry))
1142 }
1143
1144 pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1146 let entry = TxnWalEntry::txn_commit(txn_id);
1147 let seq = self.append(&entry);
1148 self.sync()?; Ok(seq)
1150 }
1151
1152 pub fn stats(&self) -> ShardedWalStats {
1154 let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1155 for shard in &self.shards {
1156 shard_entry_counts.push(shard.lock().entry_count);
1157 }
1158
1159 ShardedWalStats {
1160 num_shards: self.num_shards,
1161 total_entries: self.sequence.load(Ordering::SeqCst),
1162 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1163 shard_entry_counts,
1164 }
1165 }
1166}
1167
1168#[derive(Debug, Clone)]
1170pub struct ShardedWalStats {
1171 pub num_shards: usize,
1172 pub total_entries: u64,
1173 pub next_txn_id: u64,
1174 pub shard_entry_counts: Vec<usize>,
1175}
1176
1177#[derive(Debug, Clone, Default)]
1179pub struct CrashRecoveryStats {
1180 pub total_records: u64,
1182 pub committed_txns: u64,
1184 pub rolled_back_txns: u64,
1186 pub aborted_txns: u64,
1188 pub recovered_writes: u64,
1190 pub torn_records: u64,
1192 pub bytes_read: u64,
1194 pub recovery_duration_us: u64,
1196 pub max_txn_id: u64,
1198}
1199
1200impl TxnWal {
1201 pub fn stats(&self) -> TxnWalStats {
1203 TxnWalStats {
1204 entries_written: self.sequence.load(Ordering::SeqCst),
1205 bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1206 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1207 }
1208 }
1209
1210 #[allow(clippy::type_complexity)]
1220 pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1221 let start_time = std::time::Instant::now();
1222 let file = File::open(&self.path)?;
1223 let file_size = file.metadata()?.len();
1224 let mut reader = BufReader::new(file);
1225
1226 let mut stats = CrashRecoveryStats {
1227 bytes_read: file_size,
1228 ..Default::default()
1229 };
1230
1231 let mut committed_txns: HashSet<u64> = HashSet::new();
1232 let mut aborted_txns: HashSet<u64> = HashSet::new();
1233 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1234 std::collections::HashMap::new();
1235 let mut all_txns: HashSet<u64> = HashSet::new();
1236
1237 loop {
1239 match TxnWalEntry::from_reader(&mut reader) {
1240 Ok(entry) => {
1241 stats.total_records += 1;
1242 if entry.txn_id > stats.max_txn_id {
1243 stats.max_txn_id = entry.txn_id;
1244 }
1245
1246 match entry.record_type {
1247 WalRecordType::TxnBegin => {
1248 pending_writes.insert(entry.txn_id, Vec::new());
1249 all_txns.insert(entry.txn_id);
1250 }
1251 WalRecordType::Data => {
1252 if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1253 writes.push((entry.key, entry.value));
1254 }
1255 }
1256 WalRecordType::TxnCommit => {
1257 committed_txns.insert(entry.txn_id);
1258 }
1259 WalRecordType::TxnAbort => {
1260 pending_writes.remove(&entry.txn_id);
1261 aborted_txns.insert(entry.txn_id);
1262 }
1263 _ => {}
1264 }
1265 }
1266 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1267 break;
1269 }
1270 Err(_) => {
1271 stats.torn_records += 1;
1273 break;
1274 }
1275 }
1276 }
1277
1278 let mut result = Vec::new();
1280 for (txn_id, writes) in &pending_writes {
1281 if committed_txns.contains(txn_id) {
1282 stats.committed_txns += 1;
1283 stats.recovered_writes += writes.len() as u64;
1284 result.extend(writes.clone());
1285 }
1286 }
1287
1288 stats.aborted_txns = aborted_txns.len() as u64;
1290 stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1291
1292 stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1293
1294 Ok((result, stats))
1295 }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use super::*;
1301 use tempfile::tempdir;
1302
1303 #[test]
1304 fn test_wal_entry_roundtrip() {
1305 let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1306 let bytes = entry.to_bytes();
1307
1308 let mut cursor = std::io::Cursor::new(bytes);
1309 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1310
1311 assert_eq!(recovered.record_type, WalRecordType::Data);
1312 assert_eq!(recovered.txn_id, 42);
1313 assert_eq!(recovered.key, b"key");
1314 assert_eq!(recovered.value, b"value");
1315 }
1316
1317 #[test]
1318 fn test_wal_append_and_replay() {
1319 let dir = tempdir().unwrap();
1320 let wal_path = dir.path().join("test.wal");
1321
1322 {
1324 let wal = TxnWal::new(&wal_path).unwrap();
1325 let txn_id = wal.begin_transaction().unwrap();
1326 wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1327 wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1328 wal.commit_transaction(txn_id).unwrap();
1329 }
1330
1331 {
1333 let wal = TxnWal::new(&wal_path).unwrap();
1334 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1335
1336 assert_eq!(txn_count, 1);
1337 assert_eq!(writes.len(), 2);
1338 assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1339 assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1340 }
1341 }
1342
1343 #[test]
1344 fn test_uncommitted_transaction_rollback() {
1345 let dir = tempdir().unwrap();
1346 let wal_path = dir.path().join("test.wal");
1347
1348 {
1350 let wal = TxnWal::new(&wal_path).unwrap();
1351
1352 let txn1 = wal.begin_transaction().unwrap();
1354 wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1355 .unwrap();
1356 wal.commit_transaction(txn1).unwrap();
1357
1358 let txn2 = wal.begin_transaction().unwrap();
1360 wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1361 .unwrap();
1362 }
1364
1365 {
1367 let wal = TxnWal::new(&wal_path).unwrap();
1368 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1369
1370 assert_eq!(txn_count, 1); assert_eq!(writes.len(), 1);
1372 assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1373 }
1374 }
1375
1376 #[test]
1377 fn test_aborted_transaction() {
1378 let dir = tempdir().unwrap();
1379 let wal_path = dir.path().join("test.wal");
1380
1381 {
1382 let wal = TxnWal::new(&wal_path).unwrap();
1383
1384 let txn = wal.begin_transaction().unwrap();
1385 wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1386 .unwrap();
1387 wal.abort_transaction(txn).unwrap();
1388 }
1389
1390 {
1391 let wal = TxnWal::new(&wal_path).unwrap();
1392 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1393
1394 assert_eq!(txn_count, 0);
1395 assert!(writes.is_empty());
1396 }
1397 }
1398
1399 #[test]
1400 fn test_checksum_validation() {
1401 let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1402 let mut bytes = entry.to_bytes();
1403
1404 let len = bytes.len();
1406 bytes[len - 1] ^= 0xFF;
1407
1408 let mut cursor = std::io::Cursor::new(bytes);
1409 let result = TxnWalEntry::from_reader(&mut cursor);
1410
1411 assert!(result.is_err());
1412 }
1413
1414 #[test]
1415 fn test_crash_recovery_with_stats() {
1416 let dir = tempdir().unwrap();
1417 let wal_path = dir.path().join("test.wal");
1418
1419 {
1421 let wal = TxnWal::new(&wal_path).unwrap();
1422
1423 let txn1 = wal.begin_transaction().unwrap();
1425 wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1426 wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1427 wal.commit_transaction(txn1).unwrap();
1428
1429 let txn2 = wal.begin_transaction().unwrap();
1431 wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1432 .unwrap();
1433 wal.abort_transaction(txn2).unwrap();
1434
1435 let txn3 = wal.begin_transaction().unwrap();
1437 wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1438 wal.commit_transaction(txn3).unwrap();
1439
1440 let txn4 = wal.begin_transaction().unwrap();
1442 wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1443 .unwrap();
1444 }
1446
1447 {
1449 let wal = TxnWal::new(&wal_path).unwrap();
1450 let (writes, stats) = wal.crash_recovery().unwrap();
1451
1452 assert_eq!(writes.len(), 3);
1454 assert_eq!(stats.committed_txns, 2);
1455 assert_eq!(stats.aborted_txns, 1);
1456 assert_eq!(stats.rolled_back_txns, 1); assert_eq!(stats.recovered_writes, 3);
1458 assert!(stats.recovery_duration_us > 0);
1459 }
1460 }
1461
1462 #[test]
1463 fn test_torn_write_detection() {
1464 let dir = tempdir().unwrap();
1465 let wal_path = dir.path().join("test.wal");
1466
1467 {
1469 let wal = TxnWal::new(&wal_path).unwrap();
1470 let txn = wal.begin_transaction().unwrap();
1471 wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1472 wal.commit_transaction(txn).unwrap();
1473 }
1474
1475 {
1477 use std::io::Write;
1478 let mut file = std::fs::OpenOptions::new()
1479 .append(true)
1480 .open(&wal_path)
1481 .unwrap();
1482 file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1484 .unwrap();
1485 }
1486
1487 {
1489 let wal = TxnWal::new(&wal_path).unwrap();
1490 let (writes, stats) = wal.crash_recovery().unwrap();
1491
1492 assert_eq!(writes.len(), 1);
1494 assert_eq!(stats.committed_txns, 1);
1495 assert_eq!(stats.torn_records, 1);
1496 }
1497 }
1498
1499 #[test]
1500 fn test_crc32_determinism() {
1501 let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1503 entry1.timestamp_us = 12345; let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1506 entry2.timestamp_us = 12345; assert_eq!(entry1.checksum(), entry2.checksum());
1509
1510 let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1512 entry3.timestamp_us = 12345;
1513 assert_ne!(entry1.checksum(), entry3.checksum());
1514
1515 let bytes = entry1.to_bytes();
1517 let mut cursor = std::io::Cursor::new(bytes);
1518 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1519 assert_eq!(recovered.checksum(), entry1.checksum());
1520 }
1521}