1use byteorder::{LittleEndian, ReadBytesExt};
52use parking_lot::Mutex;
53use std::cell::Cell;
54use std::collections::HashSet;
55use std::fs::{File, OpenOptions};
56use std::io::{BufReader, BufWriter, Read, Write};
57use std::path::{Path, PathBuf};
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::time::{Instant, SystemTime, UNIX_EPOCH};
60use sochdb_core::{Result, SochDBError, WalRecordType};
61
62const CACHE_VALIDITY_NS: u64 = 1_000_000;
68
69thread_local! {
70 static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
73}
74
75#[inline(always)]
92pub fn cached_timestamp_us() -> u64 {
93 TS_CACHE.with(|cache| {
94 let (instant, ts) = cache.get();
95 let elapsed_ns = instant.elapsed().as_nanos() as u64;
96
97 if elapsed_ns < CACHE_VALIDITY_NS {
98 ts + elapsed_ns / 1000
101 } else {
102 let new_ts = SystemTime::now()
104 .duration_since(UNIX_EPOCH)
105 .expect("system clock set before UNIX epoch (1970-01-01)")
106 .as_micros() as u64;
107 cache.set((Instant::now(), new_ts));
108 new_ts
109 }
110 })
111}
112
113const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; const CHECKSUM_SIZE: usize = 4;
118
119const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
121
122#[derive(Debug)]
159pub struct TxnWalBuffer {
160 txn_id: u64,
162 buffer: Vec<u8>,
164 entry_count: usize,
166}
167
168impl TxnWalBuffer {
169 #[inline]
171 pub fn new(txn_id: u64) -> Self {
172 Self {
173 txn_id,
174 buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
175 entry_count: 0,
176 }
177 }
178
179 #[inline]
181 pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
182 Self {
183 txn_id,
184 buffer: Vec::with_capacity(capacity),
185 entry_count: 0,
186 }
187 }
188
189 #[inline]
196 pub fn append(&mut self, key: &[u8], value: &[u8]) {
197 let timestamp_us = cached_timestamp_us();
199
200 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
201 let entry_start = self.buffer.len();
202
203 self.buffer.extend_from_slice(&[0u8; 4]);
205
206 let mut hasher = crc32fast::Hasher::new();
207
208 let record_type_byte = WalRecordType::Data as u8;
210 self.buffer.push(record_type_byte);
211 hasher.update(&[record_type_byte]);
212
213 let txn_bytes = self.txn_id.to_le_bytes();
215 self.buffer.extend_from_slice(&txn_bytes);
216 hasher.update(&txn_bytes);
217
218 let ts_bytes = timestamp_us.to_le_bytes();
220 self.buffer.extend_from_slice(&ts_bytes);
221 hasher.update(&ts_bytes);
222
223 let key_len_bytes = (key.len() as u32).to_le_bytes();
225 self.buffer.extend_from_slice(&key_len_bytes);
226 hasher.update(&key_len_bytes);
227
228 let val_len_bytes = (value.len() as u32).to_le_bytes();
230 self.buffer.extend_from_slice(&val_len_bytes);
231 hasher.update(&val_len_bytes);
232
233 self.buffer.extend_from_slice(key);
235 hasher.update(key);
236
237 self.buffer.extend_from_slice(value);
239 hasher.update(value);
240
241 self.buffer
243 .extend_from_slice(&hasher.finalize().to_le_bytes());
244
245 let content_len = (total_len - 4) as u32;
247 self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
248
249 self.entry_count += 1;
250 }
251
252 #[inline]
260 pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
261 wal.flush_buffer(self)
262 }
263
264 #[inline]
266 pub fn clear(&mut self) {
267 self.buffer.clear();
268 self.entry_count = 0;
269 }
270
271 #[inline]
273 pub fn entry_count(&self) -> usize {
274 self.entry_count
275 }
276
277 #[inline]
279 pub fn bytes_buffered(&self) -> usize {
280 self.buffer.len()
281 }
282
283 #[inline]
285 pub fn is_empty(&self) -> bool {
286 self.buffer.is_empty()
287 }
288}
289
290#[derive(Debug, Clone)]
292pub struct TxnWalEntry {
293 pub record_type: WalRecordType,
295 pub txn_id: u64,
297 pub timestamp_us: u64,
299 pub key: Vec<u8>,
301 pub value: Vec<u8>,
303}
304
305impl TxnWalEntry {
306 pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
308 Self {
309 record_type: WalRecordType::Data,
310 txn_id,
311 timestamp_us: Self::now_us(),
312 key,
313 value,
314 }
315 }
316
317 pub fn txn_begin(txn_id: u64) -> Self {
319 Self {
320 record_type: WalRecordType::TxnBegin,
321 txn_id,
322 timestamp_us: Self::now_us(),
323 key: Vec::new(),
324 value: Vec::new(),
325 }
326 }
327
328 pub fn txn_commit(txn_id: u64) -> Self {
330 Self {
331 record_type: WalRecordType::TxnCommit,
332 txn_id,
333 timestamp_us: Self::now_us(),
334 key: Vec::new(),
335 value: Vec::new(),
336 }
337 }
338
339 pub fn txn_abort(txn_id: u64) -> Self {
341 Self {
342 record_type: WalRecordType::TxnAbort,
343 txn_id,
344 timestamp_us: Self::now_us(),
345 key: Vec::new(),
346 value: Vec::new(),
347 }
348 }
349
350 pub fn checkpoint(txn_id: u64) -> Self {
352 Self {
353 record_type: WalRecordType::Checkpoint,
354 txn_id,
355 timestamp_us: Self::now_us(),
356 key: Vec::new(),
357 value: Vec::new(),
358 }
359 }
360
361 pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
363 Self {
364 record_type: WalRecordType::SchemaChange,
365 txn_id,
366 timestamp_us: Self::now_us(),
367 key: Vec::new(),
368 value: schema_data,
369 }
370 }
371
372 #[inline]
374 fn now_us() -> u64 {
375 cached_timestamp_us()
376 }
377
378 pub fn checksum(&self) -> u32 {
384 let mut hasher = crc32fast::Hasher::new();
385 hasher.update(&[self.record_type as u8]);
386 hasher.update(&self.txn_id.to_le_bytes());
387 hasher.update(&self.timestamp_us.to_le_bytes());
388 hasher.update(&(self.key.len() as u32).to_le_bytes());
389 hasher.update(&(self.value.len() as u32).to_le_bytes());
390 hasher.update(&self.key);
391 hasher.update(&self.value);
392 hasher.finalize()
393 }
394
395 pub fn to_bytes(&self) -> Vec<u8> {
400 let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
401 let mut buf = Vec::with_capacity(total_len);
402 let mut hasher = crc32fast::Hasher::new();
403
404 let content_len = (total_len - 4) as u32;
406 buf.extend_from_slice(&content_len.to_le_bytes());
407
408 let record_type_byte = self.record_type as u8;
410 buf.push(record_type_byte);
411 hasher.update(&[record_type_byte]);
412
413 let txn_bytes = self.txn_id.to_le_bytes();
415 buf.extend_from_slice(&txn_bytes);
416 hasher.update(&txn_bytes);
417
418 let ts_bytes = self.timestamp_us.to_le_bytes();
420 buf.extend_from_slice(&ts_bytes);
421 hasher.update(&ts_bytes);
422
423 let key_len_bytes = (self.key.len() as u32).to_le_bytes();
425 buf.extend_from_slice(&key_len_bytes);
426 hasher.update(&key_len_bytes);
427
428 let val_len_bytes = (self.value.len() as u32).to_le_bytes();
430 buf.extend_from_slice(&val_len_bytes);
431 hasher.update(&val_len_bytes);
432
433 buf.extend_from_slice(&self.key);
435 hasher.update(&self.key);
436
437 buf.extend_from_slice(&self.value);
439 hasher.update(&self.value);
440
441 buf.extend_from_slice(&hasher.finalize().to_le_bytes());
443
444 buf
445 }
446
447 pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
454 let content_len = reader.read_u32::<LittleEndian>()?;
456 if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
457 return Err(SochDBError::Corruption("WAL entry too short".into()));
458 }
459
460 let record_type_byte = reader.read_u8()?;
462 let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
463 SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
464 })?;
465
466 let txn_id = reader.read_u64::<LittleEndian>()?;
468
469 let timestamp_us = reader.read_u64::<LittleEndian>()?;
471
472 let key_len = reader.read_u32::<LittleEndian>()? as usize;
474
475 let value_len = reader.read_u32::<LittleEndian>()? as usize;
477
478 let mut key = vec![0u8; key_len];
480 reader.read_exact(&mut key)?;
481
482 let mut value = vec![0u8; value_len];
484 reader.read_exact(&mut value)?;
485
486 let stored_checksum = reader.read_u32::<LittleEndian>()?;
488
489 let entry = Self {
490 record_type,
491 txn_id,
492 timestamp_us,
493 key,
494 value,
495 };
496
497 if entry.checksum() != stored_checksum {
499 return Err(SochDBError::Corruption(format!(
500 "WAL checksum mismatch for txn_id {}: expected {}, got {}",
501 txn_id,
502 entry.checksum(),
503 stored_checksum
504 )));
505 }
506
507 Ok(entry)
508 }
509}
510
511pub struct TxnWal {
513 path: PathBuf,
515 writer: Mutex<BufWriter<File>>,
517 next_txn_id: AtomicU64,
519 sequence: AtomicU64,
521 bytes_since_sync: AtomicU64,
523 cached_timestamp_us: AtomicU64,
526}
527
528impl TxnWal {
529 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
531 let path = path.as_ref().to_path_buf();
532
533 if let Some(parent) = path.parent() {
535 std::fs::create_dir_all(parent)?;
536 }
537
538 let file = OpenOptions::new()
539 .create(true)
540 .append(true)
541 .read(true)
542 .open(&path)?;
543
544 let now_us = cached_timestamp_us();
547
548 let wal = Self {
549 path,
550 writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
551 next_txn_id: AtomicU64::new(1),
552 sequence: AtomicU64::new(0),
553 bytes_since_sync: AtomicU64::new(0),
554 cached_timestamp_us: AtomicU64::new(now_us),
555 };
556
557 wal.recover_state()?;
559
560 Ok(wal)
561 }
562
563 fn recover_state(&self) -> Result<()> {
570 let file = File::open(&self.path)?;
571 let mut reader = BufReader::new(file);
572 let mut count: u64 = 0;
573
574 let our_pid = std::process::id() as u64;
579 let pid_base = our_pid << 32;
580 let mut max_our_counter: u64 = 0;
581
582 loop {
583 match TxnWalEntry::from_reader(&mut reader) {
584 Ok(entry) => {
585 count += 1;
586 let entry_pid = entry.txn_id >> 32;
588 if entry_pid == our_pid {
589 let entry_counter = entry.txn_id & 0xFFFF_FFFF;
590 if entry_counter > max_our_counter {
591 max_our_counter = entry_counter;
592 }
593 }
594 }
595 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
596 break;
597 }
598 Err(_) => {
599 break;
601 }
602 }
603 }
604
605 let next_id = pid_base + max_our_counter + 1;
611
612 self.next_txn_id.store(next_id, Ordering::SeqCst);
613 self.sequence.store(count, Ordering::SeqCst);
614
615 Ok(())
616 }
617
618 #[inline]
623 fn get_cached_timestamp(&self) -> u64 {
624 let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
626
627 let seq = self.sequence.load(Ordering::Relaxed);
630 if seq & 0x3FF == 0 {
631 let now_us = cached_timestamp_us();
633 self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
634 return now_us;
635 }
636
637 cached
638 }
639
640 pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
656 let bytes = entry.to_bytes();
657 let mut writer = self.writer.lock();
658
659 writer.write_all(&bytes)?;
660 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
664 self.bytes_since_sync
665 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
666
667 Ok(seq)
668 }
669
670 #[inline]
674 pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
675 let bytes = entry.to_bytes();
676 let mut writer = self.writer.lock();
677
678 writer.write_all(&bytes)?;
679 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
682 self.bytes_since_sync
683 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
684
685 Ok(seq)
686 }
687
688 #[inline]
690 pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
691 let entry = TxnWalEntry::data(txn_id, key, value);
692 self.append_no_flush(&entry)
693 }
694
695 #[inline]
700 pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
701 let timestamp_us = self.get_cached_timestamp();
703
704 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
705 let mut hasher = crc32fast::Hasher::new();
706
707 let mut writer = self.writer.lock();
708
709 let content_len = (total_len - 4) as u32;
711 writer.write_all(&content_len.to_le_bytes())?;
712
713 let record_type_byte = WalRecordType::Data as u8;
715 writer.write_all(&[record_type_byte])?;
716 hasher.update(&[record_type_byte]);
717
718 let txn_bytes = txn_id.to_le_bytes();
720 writer.write_all(&txn_bytes)?;
721 hasher.update(&txn_bytes);
722
723 let ts_bytes = timestamp_us.to_le_bytes();
725 writer.write_all(&ts_bytes)?;
726 hasher.update(&ts_bytes);
727
728 let key_len_bytes = (key.len() as u32).to_le_bytes();
730 writer.write_all(&key_len_bytes)?;
731 hasher.update(&key_len_bytes);
732
733 let val_len_bytes = (value.len() as u32).to_le_bytes();
735 writer.write_all(&val_len_bytes)?;
736 hasher.update(&val_len_bytes);
737
738 writer.write_all(key)?;
740 hasher.update(key);
741
742 writer.write_all(value)?;
744 hasher.update(value);
745
746 writer.write_all(&hasher.finalize().to_le_bytes())?;
748
749 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
750 self.bytes_since_sync
751 .fetch_add(total_len as u64, Ordering::Relaxed);
752
753 Ok(seq)
754 }
755
756 pub fn flush(&self) -> Result<()> {
758 let mut writer = self.writer.lock();
759 writer.flush()?;
760 Ok(())
761 }
762
763 pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
765 let seq = self.append(entry)?;
766 self.sync()?;
767 Ok(seq)
768 }
769
770 pub fn sync(&self) -> Result<()> {
772 let writer = self.writer.lock();
773 writer.get_ref().sync_all()?;
774 self.bytes_since_sync.store(0, Ordering::Relaxed);
775 Ok(())
776 }
777
778 #[inline]
792 pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
793 if buffer.is_empty() {
794 return Ok(0);
795 }
796
797 let mut writer = self.writer.lock();
798 writer.write_all(&buffer.buffer)?;
799
800 let seq = self
801 .sequence
802 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
803 self.bytes_since_sync
804 .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
805
806 Ok(seq)
807 }
808
809 pub fn size_bytes(&self) -> u64 {
811 std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
812 }
813
814 pub fn alloc_txn_id(&self) -> u64 {
816 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
817 }
818
819 pub fn begin_transaction(&self) -> Result<u64> {
821 let txn_id = self.alloc_txn_id();
822 let entry = TxnWalEntry::txn_begin(txn_id);
823 self.append(&entry)?;
824 Ok(txn_id)
825 }
826
827 pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
843 self.flush()?;
845
846 let entry = TxnWalEntry::txn_commit(txn_id);
848 self.append_sync(&entry)?;
849 Ok(())
850 }
851
852 pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
870 for &txn_id in txn_ids {
872 let entry = TxnWalEntry::txn_commit(txn_id);
873 self.append_no_flush(&entry)?;
874 }
875
876 self.flush()?;
878 self.sync()?;
879 Ok(())
880 }
881
882 pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
884 let entry = TxnWalEntry::txn_abort(txn_id);
885 self.append(&entry)?;
886 Ok(())
887 }
888
889 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
891 let entry = TxnWalEntry::data(txn_id, key, value);
892 self.append(&entry)
893 }
894
895 #[allow(clippy::type_complexity)]
899 pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
900 let file = File::open(&self.path)?;
901 let mut reader = BufReader::new(file);
902
903 let mut committed_txns: HashSet<u64> = HashSet::new();
904 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
905 std::collections::HashMap::new();
906
907 loop {
909 match TxnWalEntry::from_reader(&mut reader) {
910 Ok(entry) => match entry.record_type {
911 WalRecordType::TxnBegin => {
912 pending_writes
915 .entry(entry.txn_id)
916 .or_insert_with(Vec::new);
917 }
918 WalRecordType::Data => {
919 pending_writes
923 .entry(entry.txn_id)
924 .or_insert_with(Vec::new)
925 .push((entry.key, entry.value));
926 }
927 WalRecordType::TxnCommit => {
928 committed_txns.insert(entry.txn_id);
929 }
930 WalRecordType::TxnAbort => {
931 pending_writes.remove(&entry.txn_id);
932 }
933 _ => {}
934 },
935 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
936 break;
937 }
938 Err(_) => {
939 break;
940 }
941 }
942 }
943
944 let mut result = Vec::new();
946 let mut txn_count = 0;
947
948 for (txn_id, writes) in pending_writes {
949 if committed_txns.contains(&txn_id) {
950 result.extend(writes);
951 txn_count += 1;
952 }
953 }
955
956 Ok((result, txn_count))
957 }
958
959 pub fn replay<F>(&self, mut callback: F) -> Result<u64>
961 where
962 F: FnMut(TxnWalEntry) -> Result<()>,
963 {
964 let file = File::open(&self.path)?;
965 let mut reader = BufReader::new(file);
966 let mut count = 0u64;
967
968 loop {
969 match TxnWalEntry::from_reader(&mut reader) {
970 Ok(entry) => {
971 callback(entry)?;
972 count += 1;
973 }
974 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
975 break;
976 }
977 Err(e) => {
978 eprintln!("WAL replay warning: {:?}", e);
980 break;
981 }
982 }
983 }
984
985 Ok(count)
986 }
987
988 pub fn truncate(&self) -> Result<()> {
999 let mut writer = self.writer.lock();
1000 writer.flush()?;
1002 let file = writer.get_ref();
1003 file.set_len(0)?;
1004 file.sync_all()?;
1005 self.sequence.store(0, Ordering::SeqCst);
1006 self.bytes_since_sync.store(0, Ordering::Relaxed);
1007 Ok(())
1008 }
1009
1010 pub fn write_checkpoint(&self) -> Result<u64> {
1012 let entry = TxnWalEntry::checkpoint(0);
1013 self.append_sync(&entry)
1014 }
1015
1016 pub fn append_clr(
1022 &self,
1023 txn_id: u64,
1024 _original_lsn: u64,
1025 undo_next_lsn: Option<u64>,
1026 undo_data: &[u8],
1027 ) -> Result<u64> {
1028 let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1030 let entry = TxnWalEntry {
1031 record_type: WalRecordType::CompensationLogRecord,
1032 txn_id,
1033 timestamp_us: TxnWalEntry::now_us(),
1034 key, value: undo_data.to_vec(),
1036 };
1037 self.append(&entry)
1038 }
1039
1040 pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1042 let entry = TxnWalEntry {
1043 record_type: WalRecordType::Checkpoint,
1044 txn_id: 0,
1045 timestamp_us: TxnWalEntry::now_us(),
1046 key: Vec::new(),
1047 value: checkpoint_data.to_vec(),
1048 };
1049 self.append_sync(&entry)
1050 }
1051
1052 pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1054 let entry = TxnWalEntry {
1055 record_type: WalRecordType::CheckpointEnd,
1056 txn_id: 0,
1057 timestamp_us: TxnWalEntry::now_us(),
1058 key: Vec::new(),
1059 value: checkpoint_data.to_vec(),
1060 };
1061 self.append_sync(&entry)
1062 }
1063
1064 pub fn sequence(&self) -> u64 {
1066 self.sequence.load(Ordering::SeqCst)
1067 }
1068
1069 pub fn bytes_since_sync(&self) -> u64 {
1071 self.bytes_since_sync.load(Ordering::Relaxed)
1072 }
1073
1074 pub fn path(&self) -> &Path {
1076 &self.path
1077 }
1078}
1079
1080#[derive(Debug, Clone, Default)]
1082pub struct TxnWalStats {
1083 pub entries_written: u64,
1085 pub bytes_since_sync: u64,
1087 pub next_txn_id: u64,
1089}
1090
1091#[allow(dead_code)]
1104pub struct ShardedWal {
1105 shards: Vec<parking_lot::Mutex<WalShard>>,
1107 num_shards: usize,
1109 central_writer: parking_lot::Mutex<BufWriter<File>>,
1111 next_txn_id: AtomicU64,
1113 sequence: AtomicU64,
1115 path: PathBuf,
1117}
1118
1119struct WalShard {
1121 buffer: Vec<u8>,
1123 entry_count: usize,
1125}
1126
1127impl WalShard {
1128 fn new() -> Self {
1129 Self {
1130 buffer: Vec::with_capacity(64 * 1024), entry_count: 0,
1132 }
1133 }
1134
1135 fn append(&mut self, entry: &TxnWalEntry) {
1136 let bytes = entry.to_bytes();
1137 self.buffer.extend_from_slice(&bytes);
1138 self.entry_count += 1;
1139 }
1140
1141 fn is_empty(&self) -> bool {
1142 self.buffer.is_empty()
1143 }
1144
1145 fn drain(&mut self) -> Vec<u8> {
1146 self.entry_count = 0;
1147 std::mem::take(&mut self.buffer)
1148 }
1149}
1150
1151impl ShardedWal {
1152 pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1156 let path = path.as_ref().to_path_buf();
1157
1158 if let Some(parent) = path.parent() {
1159 std::fs::create_dir_all(parent)?;
1160 }
1161
1162 let file = std::fs::OpenOptions::new()
1163 .create(true)
1164 .append(true)
1165 .read(true)
1166 .open(&path)?;
1167
1168 let num_shards = num_shards.next_power_of_two();
1170 let shards: Vec<_> = (0..num_shards)
1171 .map(|_| parking_lot::Mutex::new(WalShard::new()))
1172 .collect();
1173
1174 Ok(Self {
1175 shards,
1176 num_shards,
1177 central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1178 next_txn_id: AtomicU64::new(1),
1179 sequence: AtomicU64::new(0),
1180 path,
1181 })
1182 }
1183
1184 #[inline]
1186 fn shard_idx(&self, txn_id: u64) -> usize {
1187 (txn_id as usize) & (self.num_shards - 1)
1188 }
1189
1190 pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1192 let shard_idx = self.shard_idx(entry.txn_id);
1193 let mut shard = self.shards[shard_idx].lock();
1194 shard.append(entry);
1195 self.sequence.fetch_add(1, Ordering::SeqCst)
1196 }
1197
1198 pub fn alloc_txn_id(&self) -> u64 {
1200 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1201 }
1202
1203 pub fn flush(&self) -> Result<()> {
1205 let mut central = self.central_writer.lock();
1206
1207 for shard in &self.shards {
1209 let mut shard_guard = shard.lock();
1210 if !shard_guard.is_empty() {
1211 let data = shard_guard.drain();
1212 central.write_all(&data)?;
1213 }
1214 }
1215
1216 central.flush()?;
1217 Ok(())
1218 }
1219
1220 pub fn sync(&self) -> Result<()> {
1222 self.flush()?;
1223 let central = self.central_writer.lock();
1224 central.get_ref().sync_all()?;
1225 Ok(())
1226 }
1227
1228 pub fn begin_transaction(&self) -> Result<u64> {
1230 let txn_id = self.alloc_txn_id();
1231 let entry = TxnWalEntry::txn_begin(txn_id);
1232 self.append(&entry);
1233 Ok(txn_id)
1234 }
1235
1236 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1238 let entry = TxnWalEntry::data(txn_id, key, value);
1239 Ok(self.append(&entry))
1240 }
1241
1242 pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1244 let entry = TxnWalEntry::txn_commit(txn_id);
1245 let seq = self.append(&entry);
1246 self.sync()?; Ok(seq)
1248 }
1249
1250 pub fn stats(&self) -> ShardedWalStats {
1252 let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1253 for shard in &self.shards {
1254 shard_entry_counts.push(shard.lock().entry_count);
1255 }
1256
1257 ShardedWalStats {
1258 num_shards: self.num_shards,
1259 total_entries: self.sequence.load(Ordering::SeqCst),
1260 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1261 shard_entry_counts,
1262 }
1263 }
1264}
1265
1266#[derive(Debug, Clone)]
1268pub struct ShardedWalStats {
1269 pub num_shards: usize,
1270 pub total_entries: u64,
1271 pub next_txn_id: u64,
1272 pub shard_entry_counts: Vec<usize>,
1273}
1274
1275#[derive(Debug, Clone, Default)]
1277pub struct CrashRecoveryStats {
1278 pub total_records: u64,
1280 pub committed_txns: u64,
1282 pub rolled_back_txns: u64,
1284 pub aborted_txns: u64,
1286 pub recovered_writes: u64,
1288 pub torn_records: u64,
1290 pub bytes_read: u64,
1292 pub recovery_duration_us: u64,
1294 pub max_txn_id: u64,
1296}
1297
1298impl TxnWal {
1299 pub fn stats(&self) -> TxnWalStats {
1301 TxnWalStats {
1302 entries_written: self.sequence.load(Ordering::SeqCst),
1303 bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1304 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1305 }
1306 }
1307
1308 #[allow(clippy::type_complexity)]
1318 pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1319 let start_time = std::time::Instant::now();
1320 let file = File::open(&self.path)?;
1321 let file_size = file.metadata()?.len();
1322 let mut reader = BufReader::new(file);
1323
1324 let mut stats = CrashRecoveryStats {
1325 bytes_read: file_size,
1326 ..Default::default()
1327 };
1328
1329 let mut committed_txns: HashSet<u64> = HashSet::new();
1330 let mut aborted_txns: HashSet<u64> = HashSet::new();
1331 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1332 std::collections::HashMap::new();
1333 let mut all_txns: HashSet<u64> = HashSet::new();
1334
1335 loop {
1337 match TxnWalEntry::from_reader(&mut reader) {
1338 Ok(entry) => {
1339 stats.total_records += 1;
1340 if entry.txn_id > stats.max_txn_id {
1341 stats.max_txn_id = entry.txn_id;
1342 }
1343
1344 match entry.record_type {
1345 WalRecordType::TxnBegin => {
1346 pending_writes.insert(entry.txn_id, Vec::new());
1347 all_txns.insert(entry.txn_id);
1348 }
1349 WalRecordType::Data => {
1350 if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1351 writes.push((entry.key, entry.value));
1352 }
1353 }
1354 WalRecordType::TxnCommit => {
1355 committed_txns.insert(entry.txn_id);
1356 }
1357 WalRecordType::TxnAbort => {
1358 pending_writes.remove(&entry.txn_id);
1359 aborted_txns.insert(entry.txn_id);
1360 }
1361 _ => {}
1362 }
1363 }
1364 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1365 break;
1367 }
1368 Err(_) => {
1369 stats.torn_records += 1;
1371 break;
1372 }
1373 }
1374 }
1375
1376 let mut result = Vec::new();
1378 for (txn_id, writes) in &pending_writes {
1379 if committed_txns.contains(txn_id) {
1380 stats.committed_txns += 1;
1381 stats.recovered_writes += writes.len() as u64;
1382 result.extend(writes.clone());
1383 }
1384 }
1385
1386 stats.aborted_txns = aborted_txns.len() as u64;
1388 stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1389
1390 stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1391
1392 Ok((result, stats))
1393 }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398 use super::*;
1399 use tempfile::tempdir;
1400
1401 #[test]
1402 fn test_wal_entry_roundtrip() {
1403 let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1404 let bytes = entry.to_bytes();
1405
1406 let mut cursor = std::io::Cursor::new(bytes);
1407 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1408
1409 assert_eq!(recovered.record_type, WalRecordType::Data);
1410 assert_eq!(recovered.txn_id, 42);
1411 assert_eq!(recovered.key, b"key");
1412 assert_eq!(recovered.value, b"value");
1413 }
1414
1415 #[test]
1416 fn test_wal_append_and_replay() {
1417 let dir = tempdir().unwrap();
1418 let wal_path = dir.path().join("test.wal");
1419
1420 {
1422 let wal = TxnWal::new(&wal_path).unwrap();
1423 let txn_id = wal.begin_transaction().unwrap();
1424 wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1425 wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1426 wal.commit_transaction(txn_id).unwrap();
1427 }
1428
1429 {
1431 let wal = TxnWal::new(&wal_path).unwrap();
1432 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1433
1434 assert_eq!(txn_count, 1);
1435 assert_eq!(writes.len(), 2);
1436 assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1437 assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1438 }
1439 }
1440
1441 #[test]
1442 fn test_uncommitted_transaction_rollback() {
1443 let dir = tempdir().unwrap();
1444 let wal_path = dir.path().join("test.wal");
1445
1446 {
1448 let wal = TxnWal::new(&wal_path).unwrap();
1449
1450 let txn1 = wal.begin_transaction().unwrap();
1452 wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1453 .unwrap();
1454 wal.commit_transaction(txn1).unwrap();
1455
1456 let txn2 = wal.begin_transaction().unwrap();
1458 wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1459 .unwrap();
1460 }
1462
1463 {
1465 let wal = TxnWal::new(&wal_path).unwrap();
1466 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1467
1468 assert_eq!(txn_count, 1); assert_eq!(writes.len(), 1);
1470 assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1471 }
1472 }
1473
1474 #[test]
1475 fn test_aborted_transaction() {
1476 let dir = tempdir().unwrap();
1477 let wal_path = dir.path().join("test.wal");
1478
1479 {
1480 let wal = TxnWal::new(&wal_path).unwrap();
1481
1482 let txn = wal.begin_transaction().unwrap();
1483 wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1484 .unwrap();
1485 wal.abort_transaction(txn).unwrap();
1486 }
1487
1488 {
1489 let wal = TxnWal::new(&wal_path).unwrap();
1490 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1491
1492 assert_eq!(txn_count, 0);
1493 assert!(writes.is_empty());
1494 }
1495 }
1496
1497 #[test]
1498 fn test_checksum_validation() {
1499 let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1500 let mut bytes = entry.to_bytes();
1501
1502 let len = bytes.len();
1504 bytes[len - 1] ^= 0xFF;
1505
1506 let mut cursor = std::io::Cursor::new(bytes);
1507 let result = TxnWalEntry::from_reader(&mut cursor);
1508
1509 assert!(result.is_err());
1510 }
1511
1512 #[test]
1513 fn test_crash_recovery_with_stats() {
1514 let dir = tempdir().unwrap();
1515 let wal_path = dir.path().join("test.wal");
1516
1517 {
1519 let wal = TxnWal::new(&wal_path).unwrap();
1520
1521 let txn1 = wal.begin_transaction().unwrap();
1523 wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1524 wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1525 wal.commit_transaction(txn1).unwrap();
1526
1527 let txn2 = wal.begin_transaction().unwrap();
1529 wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1530 .unwrap();
1531 wal.abort_transaction(txn2).unwrap();
1532
1533 let txn3 = wal.begin_transaction().unwrap();
1535 wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1536 wal.commit_transaction(txn3).unwrap();
1537
1538 let txn4 = wal.begin_transaction().unwrap();
1540 wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1541 .unwrap();
1542 }
1544
1545 {
1547 let wal = TxnWal::new(&wal_path).unwrap();
1548 let (writes, stats) = wal.crash_recovery().unwrap();
1549
1550 assert_eq!(writes.len(), 3);
1552 assert_eq!(stats.committed_txns, 2);
1553 assert_eq!(stats.aborted_txns, 1);
1554 assert_eq!(stats.rolled_back_txns, 1); assert_eq!(stats.recovered_writes, 3);
1556 assert!(stats.recovery_duration_us > 0);
1557 }
1558 }
1559
1560 #[test]
1561 fn test_torn_write_detection() {
1562 let dir = tempdir().unwrap();
1563 let wal_path = dir.path().join("test.wal");
1564
1565 {
1567 let wal = TxnWal::new(&wal_path).unwrap();
1568 let txn = wal.begin_transaction().unwrap();
1569 wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1570 wal.commit_transaction(txn).unwrap();
1571 }
1572
1573 {
1575 use std::io::Write;
1576 let mut file = std::fs::OpenOptions::new()
1577 .append(true)
1578 .open(&wal_path)
1579 .unwrap();
1580 file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1582 .unwrap();
1583 }
1584
1585 {
1587 let wal = TxnWal::new(&wal_path).unwrap();
1588 let (writes, stats) = wal.crash_recovery().unwrap();
1589
1590 assert_eq!(writes.len(), 1);
1592 assert_eq!(stats.committed_txns, 1);
1593 assert_eq!(stats.torn_records, 1);
1594 }
1595 }
1596
1597 #[test]
1598 fn test_crc32_determinism() {
1599 let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1601 entry1.timestamp_us = 12345; let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1604 entry2.timestamp_us = 12345; assert_eq!(entry1.checksum(), entry2.checksum());
1607
1608 let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1610 entry3.timestamp_us = 12345;
1611 assert_ne!(entry1.checksum(), entry3.checksum());
1612
1613 let bytes = entry1.to_bytes();
1615 let mut cursor = std::io::Cursor::new(bytes);
1616 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1617 assert_eq!(recovered.checksum(), entry1.checksum());
1618 }
1619}