1use std::collections::{HashMap, VecDeque};
39use std::fs::{File, OpenOptions};
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::{Condvar, Mutex, RwLock};
44use std::time::{Duration, Instant};
45
46pub type Lsn = u64;
48
49pub type TxnId = u64;
51
52pub type PageId = u64;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64#[repr(u8)]
65pub enum WalRecordType {
66 Update = 1,
68 Commit = 2,
70 Abort = 3,
72 Clr = 4,
74 Checkpoint = 5,
76 Begin = 6,
78 End = 7,
80}
81
82impl WalRecordType {
83 pub fn to_canonical(self) -> sochdb_core::txn::WalRecordType {
85 use sochdb_core::txn::WalRecordType as C;
86 match self {
87 Self::Update => C::PageUpdate,
88 Self::Commit => C::TxnCommit,
89 Self::Abort => C::TxnAbort,
90 Self::Clr => C::CompensationLogRecord,
91 Self::Checkpoint => C::Checkpoint,
92 Self::Begin => C::TxnBegin,
93 Self::End => C::TxnEnd,
94 }
95 }
96
97 pub fn from_canonical(rt: sochdb_core::txn::WalRecordType) -> Option<Self> {
100 use sochdb_core::txn::WalRecordType as C;
101 match rt {
102 C::PageUpdate => Some(Self::Update),
103 C::TxnCommit => Some(Self::Commit),
104 C::TxnAbort => Some(Self::Abort),
105 C::CompensationLogRecord => Some(Self::Clr),
106 C::Checkpoint => Some(Self::Checkpoint),
107 C::TxnBegin => Some(Self::Begin),
108 C::TxnEnd => Some(Self::End),
109 _ => None,
110 }
111 }
112}
113
114impl TryFrom<u8> for WalRecordType {
115 type Error = ();
116
117 fn try_from(value: u8) -> Result<Self, Self::Error> {
118 match value {
119 1 => Ok(WalRecordType::Update),
120 2 => Ok(WalRecordType::Commit),
121 3 => Ok(WalRecordType::Abort),
122 4 => Ok(WalRecordType::Clr),
123 5 => Ok(WalRecordType::Checkpoint),
124 6 => Ok(WalRecordType::Begin),
125 7 => Ok(WalRecordType::End),
126 _ => Err(()),
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
133#[repr(C, packed)]
134pub struct WalRecordHeader {
135 pub lsn: u64,
137 pub txn_id: u64,
139 pub record_type: u8,
141 pub prev_lsn: u64,
143 pub page_id: u64,
145 pub offset: u16,
147 pub data_length: u32,
149 pub before_length: u16,
151 _reserved: [u8; 5],
153}
154
155impl WalRecordHeader {
156 pub const SIZE: usize = 48; pub fn serialize(&self) -> [u8; Self::SIZE] {
159 let mut buf = [0u8; Self::SIZE];
160 buf[0..8].copy_from_slice(&self.lsn.to_le_bytes());
161 buf[8..16].copy_from_slice(&self.txn_id.to_le_bytes());
162 buf[16] = self.record_type;
163 buf[17..25].copy_from_slice(&self.prev_lsn.to_le_bytes());
164 buf[25..33].copy_from_slice(&self.page_id.to_le_bytes());
165 buf[33..35].copy_from_slice(&self.offset.to_le_bytes());
166 buf[35..39].copy_from_slice(&self.data_length.to_le_bytes());
167 buf[39..41].copy_from_slice(&self.before_length.to_le_bytes());
168 buf
169 }
170
171 pub fn deserialize(buf: &[u8]) -> Option<Self> {
172 if buf.len() < Self::SIZE {
173 return None;
174 }
175 Some(Self {
176 lsn: u64::from_le_bytes(buf[0..8].try_into().ok()?),
177 txn_id: u64::from_le_bytes(buf[8..16].try_into().ok()?),
178 record_type: buf[16],
179 prev_lsn: u64::from_le_bytes(buf[17..25].try_into().ok()?),
180 page_id: u64::from_le_bytes(buf[25..33].try_into().ok()?),
181 offset: u16::from_le_bytes(buf[33..35].try_into().ok()?),
182 data_length: u32::from_le_bytes(buf[35..39].try_into().ok()?),
183 before_length: u16::from_le_bytes(buf[39..41].try_into().ok()?),
184 _reserved: [0; 5],
185 })
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct WalRecord {
192 pub header: WalRecordHeader,
193 pub before_image: Vec<u8>,
195 pub after_image: Vec<u8>,
197}
198
199impl WalRecord {
200 pub fn update(
202 lsn: Lsn,
203 txn_id: TxnId,
204 prev_lsn: Lsn,
205 page_id: PageId,
206 offset: u16,
207 before: Vec<u8>,
208 after: Vec<u8>,
209 ) -> Self {
210 Self {
211 header: WalRecordHeader {
212 lsn,
213 txn_id,
214 record_type: WalRecordType::Update as u8,
215 prev_lsn,
216 page_id,
217 offset,
218 data_length: (before.len() + after.len()) as u32,
219 before_length: before.len() as u16,
220 _reserved: [0; 5],
221 },
222 before_image: before,
223 after_image: after,
224 }
225 }
226
227 pub fn commit(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
229 Self {
230 header: WalRecordHeader {
231 lsn,
232 txn_id,
233 record_type: WalRecordType::Commit as u8,
234 prev_lsn,
235 page_id: 0,
236 offset: 0,
237 data_length: 0,
238 before_length: 0,
239 _reserved: [0; 5],
240 },
241 before_image: Vec::new(),
242 after_image: Vec::new(),
243 }
244 }
245
246 pub fn begin(lsn: Lsn, txn_id: TxnId) -> Self {
248 Self {
249 header: WalRecordHeader {
250 lsn,
251 txn_id,
252 record_type: WalRecordType::Begin as u8,
253 prev_lsn: 0,
254 page_id: 0,
255 offset: 0,
256 data_length: 0,
257 before_length: 0,
258 _reserved: [0; 5],
259 },
260 before_image: Vec::new(),
261 after_image: Vec::new(),
262 }
263 }
264
265 pub fn abort(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
267 Self {
268 header: WalRecordHeader {
269 lsn,
270 txn_id,
271 record_type: WalRecordType::Abort as u8,
272 prev_lsn,
273 page_id: 0,
274 offset: 0,
275 data_length: 0,
276 before_length: 0,
277 _reserved: [0; 5],
278 },
279 before_image: Vec::new(),
280 after_image: Vec::new(),
281 }
282 }
283
284 pub fn clr(
286 lsn: Lsn,
287 txn_id: TxnId,
288 prev_lsn: Lsn,
289 page_id: PageId,
290 offset: u16,
291 undo_next_lsn: Lsn, ) -> Self {
293 Self {
294 header: WalRecordHeader {
295 lsn,
296 txn_id,
297 record_type: WalRecordType::Clr as u8,
298 prev_lsn,
299 page_id,
300 offset,
301 data_length: 8,
302 before_length: 0,
303 _reserved: [0; 5],
304 },
305 before_image: Vec::new(),
306 after_image: undo_next_lsn.to_le_bytes().to_vec(),
307 }
308 }
309
310 pub fn serialize(&self) -> Vec<u8> {
312 let mut buf = Vec::with_capacity(
313 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4,
314 );
315 buf.extend_from_slice(&self.header.serialize());
316 buf.extend_from_slice(&self.before_image);
317 buf.extend_from_slice(&self.after_image);
318
319 let crc = crc32_of(&buf);
321 buf.extend_from_slice(&crc.to_le_bytes());
322 buf
323 }
324
325 pub fn deserialize(buf: &[u8]) -> Option<Self> {
327 if buf.len() < WalRecordHeader::SIZE + 4 {
328 return None;
329 }
330
331 let header = WalRecordHeader::deserialize(buf)?;
332 let data_start = WalRecordHeader::SIZE;
333 let data_end = data_start + header.data_length as usize;
334
335 if buf.len() < data_end + 4 {
336 return None;
337 }
338
339 let expected_crc = u32::from_le_bytes(buf[data_end..data_end + 4].try_into().ok()?);
341 let actual_crc = crc32_of(&buf[..data_end]);
342 if expected_crc != actual_crc {
343 return None;
344 }
345
346 let before_end = data_start + header.before_length as usize;
347 Some(Self {
348 header,
349 before_image: buf[data_start..before_end].to_vec(),
350 after_image: buf[before_end..data_end].to_vec(),
351 })
352 }
353
354 pub fn size(&self) -> usize {
356 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4
357 }
358}
359
360fn crc32_of(data: &[u8]) -> u32 {
362 let mut crc: u32 = 0xFFFFFFFF;
363 for byte in data {
364 crc ^= *byte as u32;
365 for _ in 0..8 {
366 if crc & 1 != 0 {
367 crc = (crc >> 1) ^ 0xEDB88320;
368 } else {
369 crc >>= 1;
370 }
371 }
372 }
373 !crc
374}
375
376#[derive(Debug)]
378pub struct GroupCommitBuffer {
379 records: Vec<WalRecord>,
381 bytes: usize,
383 waiters: Vec<(TxnId, std::sync::mpsc::Sender<Result<Lsn, WalError>>)>,
385 last_flush: Instant,
387}
388
389impl GroupCommitBuffer {
390 fn new() -> Self {
391 Self {
392 records: Vec::with_capacity(128),
393 bytes: 0,
394 waiters: Vec::new(),
395 last_flush: Instant::now(),
396 }
397 }
398
399 fn add_record(&mut self, record: WalRecord) {
400 self.bytes += record.size();
401 self.records.push(record);
402 }
403
404 fn add_waiter(
405 &mut self,
406 txn_id: TxnId,
407 sender: std::sync::mpsc::Sender<Result<Lsn, WalError>>,
408 ) {
409 self.waiters.push((txn_id, sender));
410 }
411
412 fn should_flush(&self, config: &WalConfig) -> bool {
413 self.bytes >= config.buffer_size
414 || self.records.len() >= config.max_batch_size
415 || self.last_flush.elapsed() >= config.flush_interval
416 }
417
418 fn clear(&mut self) {
419 self.records.clear();
420 self.bytes = 0;
421 self.waiters.clear();
422 self.last_flush = Instant::now();
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct WalConfig {
429 pub buffer_size: usize,
431 pub max_batch_size: usize,
433 pub flush_interval: Duration,
435 pub sync_mode: SyncMode,
437 pub checkpoint_interval: u64,
439}
440
441impl Default for WalConfig {
442 fn default() -> Self {
443 Self {
444 buffer_size: 1024 * 1024, max_batch_size: 1000,
446 flush_interval: Duration::from_millis(10),
447 sync_mode: SyncMode::Fsync,
448 checkpoint_interval: 100_000,
449 }
450 }
451}
452
453#[derive(Debug, Clone, Copy, PartialEq, Eq)]
455pub enum SyncMode {
456 None,
458 Fsync,
460 FdataSync,
462}
463
464#[derive(Debug, Clone)]
466pub enum WalError {
467 Io(String),
468 Corruption(String),
469 InvalidRecord,
470 BufferFull,
471}
472
473impl std::fmt::Display for WalError {
474 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475 match self {
476 WalError::Io(e) => write!(f, "WAL I/O error: {}", e),
477 WalError::Corruption(e) => write!(f, "WAL corruption: {}", e),
478 WalError::InvalidRecord => write!(f, "Invalid WAL record"),
479 WalError::BufferFull => write!(f, "WAL buffer full"),
480 }
481 }
482}
483
484impl std::error::Error for WalError {}
485
486#[allow(dead_code)]
488pub struct WriteAheadLog {
489 dir: PathBuf,
491 file: Mutex<File>,
493 file_number: AtomicU64,
495 lsn: AtomicU64,
497 buffer: Mutex<GroupCommitBuffer>,
499 flush_cv: Condvar,
501 config: WalConfig,
503 stats: WalStats,
505 running: AtomicBool,
507 flushed_lsn: AtomicU64,
509 txn_prev_lsn: RwLock<HashMap<TxnId, Lsn>>,
511}
512
513#[derive(Debug, Default)]
515pub struct WalStats {
516 pub records_written: AtomicU64,
518 pub bytes_written: AtomicU64,
520 pub flushes: AtomicU64,
522 pub total_batch_records: AtomicU64,
524 pub total_flush_time_us: AtomicU64,
526}
527
528impl WalStats {
529 pub fn avg_batch_size(&self) -> f64 {
530 let flushes = self.flushes.load(Ordering::Relaxed);
531 if flushes == 0 {
532 return 0.0;
533 }
534 self.total_batch_records.load(Ordering::Relaxed) as f64 / flushes as f64
535 }
536
537 pub fn avg_flush_time_us(&self) -> f64 {
538 let flushes = self.flushes.load(Ordering::Relaxed);
539 if flushes == 0 {
540 return 0.0;
541 }
542 self.total_flush_time_us.load(Ordering::Relaxed) as f64 / flushes as f64
543 }
544}
545
546impl WriteAheadLog {
547 pub fn open(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self, WalError> {
549 let dir = dir.as_ref().to_path_buf();
550 std::fs::create_dir_all(&dir).map_err(|e| WalError::Io(e.to_string()))?;
551
552 let file_number = Self::find_latest_file(&dir).unwrap_or(0);
554 let file_path = dir.join(format!("wal_{:08}.log", file_number));
555
556 let file = OpenOptions::new()
557 .create(true)
558 .read(true)
559 .append(true)
560 .open(&file_path)
561 .map_err(|e| WalError::Io(e.to_string()))?;
562
563 let lsn = Self::find_last_lsn(&file_path).unwrap_or(0);
565
566 Ok(Self {
567 dir,
568 file: Mutex::new(file),
569 file_number: AtomicU64::new(file_number),
570 lsn: AtomicU64::new(lsn),
571 buffer: Mutex::new(GroupCommitBuffer::new()),
572 flush_cv: Condvar::new(),
573 config,
574 stats: WalStats::default(),
575 running: AtomicBool::new(true),
576 flushed_lsn: AtomicU64::new(lsn),
577 txn_prev_lsn: RwLock::new(HashMap::new()),
578 })
579 }
580
581 fn find_latest_file(dir: &Path) -> Option<u64> {
582 std::fs::read_dir(dir)
583 .ok()?
584 .filter_map(|e| e.ok())
585 .filter_map(|e| {
586 let name = e.file_name().to_string_lossy().to_string();
587 if name.starts_with("wal_") && name.ends_with(".log") {
588 name[4..12].parse::<u64>().ok()
589 } else {
590 None
591 }
592 })
593 .max()
594 }
595
596 fn find_last_lsn(path: &Path) -> Option<Lsn> {
597 let mut file = File::open(path).ok()?;
598 let mut lsn = 0u64;
599 let mut buf = [0u8; WalRecordHeader::SIZE];
600
601 while let Ok(n) = file.read(&mut buf) {
602 if n < WalRecordHeader::SIZE {
603 break;
604 }
605 if let Some(header) = WalRecordHeader::deserialize(&buf) {
606 lsn = header.lsn;
607 let skip = header.data_length as i64 + 4; if file.seek(SeekFrom::Current(skip)).is_err() {
610 break;
611 }
612 } else {
613 break;
614 }
615 }
616
617 Some(lsn)
618 }
619
620 pub fn next_lsn(&self) -> Lsn {
622 self.lsn.fetch_add(1, Ordering::SeqCst) + 1
623 }
624
625 pub fn current_lsn(&self) -> Lsn {
627 self.lsn.load(Ordering::SeqCst)
628 }
629
630 pub fn flushed_lsn(&self) -> Lsn {
632 self.flushed_lsn.load(Ordering::Acquire)
633 }
634
635 pub fn begin_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
637 let lsn = self.next_lsn();
638 let record = WalRecord::begin(lsn, txn_id);
639
640 {
641 let mut prev_lsn = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
642 prev_lsn.insert(txn_id, lsn);
643 }
644
645 self.append(record)?;
646 Ok(lsn)
647 }
648
649 pub fn log_update(
651 &self,
652 txn_id: TxnId,
653 page_id: PageId,
654 offset: u16,
655 before: Vec<u8>,
656 after: Vec<u8>,
657 ) -> Result<Lsn, WalError> {
658 let lsn = self.next_lsn();
659 let prev_lsn = {
660 let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
661 prev_lsn.get(&txn_id).copied().unwrap_or(0)
662 };
663
664 let record = WalRecord::update(lsn, txn_id, prev_lsn, page_id, offset, before, after);
665
666 {
667 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
668 prev_lsn_map.insert(txn_id, lsn);
669 }
670
671 self.append(record)?;
672 Ok(lsn)
673 }
674
675 pub fn commit_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
677 let lsn = self.next_lsn();
678 let prev_lsn = {
679 let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
680 prev_lsn.get(&txn_id).copied().unwrap_or(0)
681 };
682
683 let record = WalRecord::commit(lsn, txn_id, prev_lsn);
684
685 let (tx, rx) = std::sync::mpsc::channel();
687
688 {
689 let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
690 buffer.add_record(record);
691 buffer.add_waiter(txn_id, tx);
692
693 if buffer.should_flush(&self.config) {
694 self.flush_buffer_locked(&mut buffer)?;
695 }
696 }
697
698 {
700 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
701 prev_lsn_map.remove(&txn_id);
702 }
703
704 rx.recv()
706 .map_err(|_| WalError::Io("Channel closed".to_string()))?
707 }
708
709 pub fn abort_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
711 let lsn = self.next_lsn();
712 let prev_lsn = {
713 let prev_lsn = self.txn_prev_lsn.read().unwrap_or_else(|e| e.into_inner());
714 prev_lsn.get(&txn_id).copied().unwrap_or(0)
715 };
716
717 let record = WalRecord::abort(lsn, txn_id, prev_lsn);
718
719 {
720 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap_or_else(|e| e.into_inner());
721 prev_lsn_map.remove(&txn_id);
722 }
723
724 self.append(record)?;
725 self.force_flush()?;
726 Ok(lsn)
727 }
728
729 fn append(&self, record: WalRecord) -> Result<(), WalError> {
731 let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
732 buffer.add_record(record);
733
734 if buffer.should_flush(&self.config) {
735 self.flush_buffer_locked(&mut buffer)?;
736 }
737
738 Ok(())
739 }
740
741 pub fn force_flush(&self) -> Result<Lsn, WalError> {
743 let mut buffer = self.buffer.lock().unwrap_or_else(|e| e.into_inner());
744 if !buffer.records.is_empty() {
745 self.flush_buffer_locked(&mut buffer)?;
746 }
747 Ok(self.flushed_lsn.load(Ordering::Acquire))
748 }
749
750 fn flush_buffer_locked(&self, buffer: &mut GroupCommitBuffer) -> Result<(), WalError> {
752 if buffer.records.is_empty() {
753 return Ok(());
754 }
755
756 let start = Instant::now();
757 let record_count = buffer.records.len() as u64;
758
759 let mut data = Vec::with_capacity(buffer.bytes);
761 let mut last_lsn = 0;
762 for record in &buffer.records {
763 last_lsn = record.header.lsn;
764 data.extend(record.serialize());
765 }
766
767 {
769 let mut file = self.file.lock().unwrap_or_else(|e| e.into_inner());
770 file.write_all(&data)
771 .map_err(|e| WalError::Io(e.to_string()))?;
772
773 match self.config.sync_mode {
775 SyncMode::Fsync => {
776 file.sync_all().map_err(|e| WalError::Io(e.to_string()))?;
777 }
778 SyncMode::FdataSync => {
779 file.sync_data().map_err(|e| WalError::Io(e.to_string()))?;
780 }
781 SyncMode::None => {}
782 }
783 }
784
785 self.flushed_lsn.store(last_lsn, Ordering::Release);
787
788 let elapsed_us = start.elapsed().as_micros() as u64;
790 self.stats
791 .records_written
792 .fetch_add(record_count, Ordering::Relaxed);
793 self.stats
794 .bytes_written
795 .fetch_add(data.len() as u64, Ordering::Relaxed);
796 self.stats.flushes.fetch_add(1, Ordering::Relaxed);
797 self.stats
798 .total_batch_records
799 .fetch_add(record_count, Ordering::Relaxed);
800 self.stats
801 .total_flush_time_us
802 .fetch_add(elapsed_us, Ordering::Relaxed);
803
804 for (_, sender) in buffer.waiters.drain(..) {
806 let _ = sender.send(Ok(last_lsn));
807 }
808
809 buffer.clear();
810 Ok(())
811 }
812
813 pub fn stats(&self) -> &WalStats {
815 &self.stats
816 }
817
818 pub fn recover<R: RecoveryHandler>(&self, handler: &mut R) -> Result<RecoveryStats, WalError> {
820 let start = Instant::now();
821
822 let (dirty_pages, active_txns, last_checkpoint) = self.analysis_pass()?;
824
825 let redo_start = dirty_pages
827 .values()
828 .min()
829 .copied()
830 .unwrap_or(last_checkpoint);
831 let redo_count = self.redo_pass(redo_start, handler)?;
832
833 let undo_count = self.undo_pass(&active_txns, handler)?;
835
836 Ok(RecoveryStats {
837 analysis_time: start.elapsed(),
838 redo_records: redo_count,
839 undo_records: undo_count,
840 dirty_pages: dirty_pages.len(),
841 active_txns: active_txns.len(),
842 })
843 }
844
845 #[allow(clippy::type_complexity)]
847 fn analysis_pass(&self) -> Result<(HashMap<PageId, Lsn>, HashMap<TxnId, Lsn>, Lsn), WalError> {
848 let mut dirty_pages: HashMap<PageId, Lsn> = HashMap::new();
849 let mut active_txns: HashMap<TxnId, Lsn> = HashMap::new();
850 let mut last_checkpoint = 0;
851
852 for record in self.iter_records()? {
853 let record = record?;
854 let lsn = record.header.lsn;
855 let txn_id = record.header.txn_id;
856
857 match WalRecordType::try_from(record.header.record_type) {
858 Ok(WalRecordType::Begin) => {
859 active_txns.insert(txn_id, lsn);
860 }
861 Ok(WalRecordType::Update) => {
862 let page_id = record.header.page_id;
864 dirty_pages.entry(page_id).or_insert(lsn);
865 active_txns.insert(txn_id, lsn);
867 }
868 Ok(WalRecordType::Commit) | Ok(WalRecordType::Abort) | Ok(WalRecordType::End) => {
869 active_txns.remove(&txn_id);
870 }
871 Ok(WalRecordType::Clr) => {
872 active_txns.insert(txn_id, lsn);
873 }
874 Ok(WalRecordType::Checkpoint) => {
875 last_checkpoint = lsn;
876 }
877 Err(_) => {}
878 }
879 }
880
881 Ok((dirty_pages, active_txns, last_checkpoint))
882 }
883
884 fn redo_pass<R: RecoveryHandler>(
886 &self,
887 start_lsn: Lsn,
888 handler: &mut R,
889 ) -> Result<u64, WalError> {
890 let mut count = 0;
891
892 for record in self.iter_records_from(start_lsn)? {
893 let record = record?;
894
895 match WalRecordType::try_from(record.header.record_type) {
896 Ok(WalRecordType::Update) => {
897 handler.redo(&record)?;
898 count += 1;
899 }
900 Ok(WalRecordType::Clr) => {
901 count += 1;
903 }
904 _ => {}
905 }
906 }
907
908 Ok(count)
909 }
910
911 fn undo_pass<R: RecoveryHandler>(
913 &self,
914 active_txns: &HashMap<TxnId, Lsn>,
915 handler: &mut R,
916 ) -> Result<u64, WalError> {
917 let mut count = 0;
918
919 let mut undo_list: VecDeque<(Lsn, TxnId)> = active_txns
921 .iter()
922 .map(|(&txn_id, &lsn)| (lsn, txn_id))
923 .collect();
924 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
925
926 while let Some((lsn, txn_id)) = undo_list.pop_front() {
927 if lsn == 0 {
928 continue;
929 }
930
931 if let Some(record) = self.read_record_at(lsn)? {
933 match WalRecordType::try_from(record.header.record_type) {
934 Ok(WalRecordType::Update) => {
935 handler.undo(&record)?;
937 count += 1;
938
939 let clr_lsn = self.next_lsn();
941 let clr = WalRecord::clr(
942 clr_lsn,
943 txn_id,
944 lsn,
945 record.header.page_id,
946 record.header.offset,
947 record.header.prev_lsn,
948 );
949 self.append(clr)?;
950
951 if record.header.prev_lsn > 0 {
953 undo_list.push_back((record.header.prev_lsn, txn_id));
954 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
955 }
956 }
957 Ok(WalRecordType::Clr) => {
958 if record.after_image.len() >= 8 {
960 let bytes: [u8; 8] = record.after_image[0..8]
961 .try_into()
962 .unwrap_or([0; 8]);
963 let undo_next = u64::from_le_bytes(bytes);
964 if undo_next > 0 {
965 undo_list.push_back((undo_next, txn_id));
966 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
967 }
968 }
969 }
970 _ => {
971 if record.header.prev_lsn > 0 {
973 undo_list.push_back((record.header.prev_lsn, txn_id));
974 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
975 }
976 }
977 }
978 }
979 }
980
981 self.force_flush()?;
982 Ok(count)
983 }
984
985 fn read_record_at(&self, target_lsn: Lsn) -> Result<Option<WalRecord>, WalError> {
987 for record in self.iter_records()? {
988 let record = record?;
989 if record.header.lsn == target_lsn {
990 return Ok(Some(record));
991 }
992 if record.header.lsn > target_lsn {
993 break;
994 }
995 }
996 Ok(None)
997 }
998
999 fn iter_records(&self) -> Result<WalIterator, WalError> {
1001 self.iter_records_from(0)
1002 }
1003
1004 fn iter_records_from(&self, start_lsn: Lsn) -> Result<WalIterator, WalError> {
1006 let file_path = self.dir.join(format!(
1007 "wal_{:08}.log",
1008 self.file_number.load(Ordering::Relaxed)
1009 ));
1010
1011 let file = File::open(&file_path).map_err(|e| WalError::Io(e.to_string()))?;
1012
1013 Ok(WalIterator {
1014 file,
1015 start_lsn,
1016 started: false,
1017 })
1018 }
1019}
1020
1021pub trait RecoveryHandler {
1023 fn redo(&mut self, record: &WalRecord) -> Result<(), WalError>;
1024 fn undo(&mut self, record: &WalRecord) -> Result<(), WalError>;
1025}
1026
1027pub struct NoOpRecoveryHandler;
1029
1030impl RecoveryHandler for NoOpRecoveryHandler {
1031 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1032 Ok(())
1033 }
1034 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1035 Ok(())
1036 }
1037}
1038
1039#[derive(Debug, Clone)]
1041pub struct RecoveryStats {
1042 pub analysis_time: Duration,
1043 pub redo_records: u64,
1044 pub undo_records: u64,
1045 pub dirty_pages: usize,
1046 pub active_txns: usize,
1047}
1048
1049pub struct WalIterator {
1051 file: File,
1052 start_lsn: Lsn,
1053 started: bool,
1054}
1055
1056impl Iterator for WalIterator {
1057 type Item = Result<WalRecord, WalError>;
1058
1059 fn next(&mut self) -> Option<Self::Item> {
1060 let mut header_buf = [0u8; WalRecordHeader::SIZE];
1061
1062 match self.file.read_exact(&mut header_buf) {
1063 Ok(()) => {}
1064 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1065 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1066 }
1067
1068 let header = match WalRecordHeader::deserialize(&header_buf) {
1069 Some(h) => h,
1070 None => return Some(Err(WalError::InvalidRecord)),
1071 };
1072
1073 if !self.started {
1075 if header.lsn < self.start_lsn {
1076 let skip = header.data_length as i64 + 4;
1078 if let Err(e) = self.file.seek(SeekFrom::Current(skip)) {
1079 return Some(Err(WalError::Io(e.to_string())));
1080 }
1081 return self.next();
1082 }
1083 self.started = true;
1084 }
1085
1086 let data_len = header.data_length as usize;
1088 let mut data_buf = vec![0u8; data_len + 4]; match self.file.read_exact(&mut data_buf) {
1091 Ok(()) => {}
1092 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1093 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1094 }
1095
1096 let mut full_buf = Vec::with_capacity(WalRecordHeader::SIZE + data_len);
1098 full_buf.extend_from_slice(&header_buf);
1099 full_buf.extend_from_slice(&data_buf[..data_len]);
1100
1101 let crc_bytes: [u8; 4] = match data_buf[data_len..data_len + 4].try_into() {
1102 Ok(b) => b,
1103 Err(_) => return Some(Err(WalError::Corruption("CRC bytes truncated".to_string()))),
1104 };
1105 let expected_crc = u32::from_le_bytes(crc_bytes);
1106 let actual_crc = crc32_of(&full_buf);
1107
1108 if expected_crc != actual_crc {
1109 return Some(Err(WalError::Corruption("CRC mismatch".to_string())));
1110 }
1111
1112 let before_end = header.before_length as usize;
1113 Some(Ok(WalRecord {
1114 header,
1115 before_image: data_buf[..before_end].to_vec(),
1116 after_image: data_buf[before_end..data_len].to_vec(),
1117 }))
1118 }
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123 use super::*;
1124 use std::sync::atomic::AtomicU64;
1125 use tempfile::TempDir;
1126 use sochdb_core::ValidityBitmap;
1127
1128 #[test]
1129 fn test_wal_record_serialization() {
1130 let record = WalRecord::update(1, 100, 0, 1000, 0, vec![1, 2, 3, 4], vec![5, 6, 7, 8]);
1131
1132 let serialized = record.serialize();
1133 let deserialized = WalRecord::deserialize(&serialized).unwrap();
1134
1135 let lsn = deserialized.header.lsn;
1137 let txn_id = deserialized.header.txn_id;
1138
1139 assert_eq!(lsn, 1);
1140 assert_eq!(txn_id, 100);
1141 assert_eq!(deserialized.before_image, vec![1, 2, 3, 4]);
1142 assert_eq!(deserialized.after_image, vec![5, 6, 7, 8]);
1143 }
1144
1145 #[test]
1146 #[ignore] fn test_wal_basic_operations() {
1148 let dir = TempDir::new().unwrap();
1149 let config = WalConfig {
1150 sync_mode: SyncMode::None, ..Default::default()
1152 };
1153
1154 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1155
1156 let begin_lsn = wal.begin_txn(1).unwrap();
1158 assert!(begin_lsn > 0);
1159
1160 let update_lsn = wal.log_update(1, 100, 0, vec![0; 10], vec![1; 10]).unwrap();
1162 assert!(update_lsn > begin_lsn);
1163
1164 let commit_lsn = wal.commit_txn(1).unwrap();
1166 assert!(commit_lsn > update_lsn);
1167
1168 assert!(wal.stats().records_written.load(Ordering::Relaxed) >= 3);
1170 }
1171
1172 #[test]
1173 #[ignore] fn test_wal_group_commit() {
1175 let dir = TempDir::new().unwrap();
1176 let config = WalConfig {
1177 sync_mode: SyncMode::None,
1178 buffer_size: 10000, max_batch_size: 100,
1180 flush_interval: Duration::from_secs(10), ..Default::default()
1182 };
1183
1184 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1185
1186 for i in 0..10 {
1188 wal.begin_txn(i).unwrap();
1189 wal.log_update(i, 100 + i, 0, vec![0; 10], vec![1; 10])
1190 .unwrap();
1191 }
1192
1193 wal.force_flush().unwrap();
1195
1196 let stats = wal.stats();
1197 let flushes = stats.flushes.load(Ordering::Relaxed);
1198 let records = stats.records_written.load(Ordering::Relaxed);
1199
1200 assert!(records >= 20); println!(
1203 "Flushes: {}, Records: {}, Avg batch: {:.1}",
1204 flushes,
1205 records,
1206 stats.avg_batch_size()
1207 );
1208 }
1209
1210 #[test]
1211 fn test_crc32() {
1212 let data = b"hello world";
1213 let crc = crc32_of(data);
1214 assert_ne!(crc, 0);
1215
1216 let crc2 = crc32_of(data);
1218 assert_eq!(crc, crc2);
1219
1220 let data2 = b"hello World"; let crc3 = crc32_of(data2);
1223 assert_ne!(crc, crc3);
1224 }
1225
1226 #[test]
1227 #[ignore] fn test_wal_iterator() {
1229 let dir = TempDir::new().unwrap();
1230 let config = WalConfig {
1231 sync_mode: SyncMode::None,
1232 ..Default::default()
1233 };
1234
1235 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1236
1237 wal.begin_txn(1).unwrap();
1239 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1240 .unwrap();
1241 wal.log_update(1, 101, 0, vec![7, 8, 9], vec![10, 11, 12])
1242 .unwrap();
1243 wal.force_flush().unwrap();
1244
1245 let count = wal.iter_records().unwrap().count();
1247 assert_eq!(count, 3); }
1249
1250 #[test]
1251 #[ignore] fn test_wal_persistence() {
1253 let dir = TempDir::new().unwrap();
1254 let config = WalConfig {
1255 sync_mode: SyncMode::None,
1256 ..Default::default()
1257 };
1258
1259 {
1261 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1262 wal.begin_txn(1).unwrap();
1263 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1264 .unwrap();
1265 wal.commit_txn(1).unwrap();
1266 }
1267
1268 {
1270 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1271 let count = wal.iter_records().unwrap().count();
1272 assert_eq!(count, 3); }
1274 }
1275
1276 #[test]
1277 #[ignore] fn test_wal_recovery_analysis() {
1279 let dir = TempDir::new().unwrap();
1280 let config = WalConfig {
1281 sync_mode: SyncMode::None,
1282 ..Default::default()
1283 };
1284
1285 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1286
1287 wal.begin_txn(1).unwrap();
1289 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1290 wal.commit_txn(1).unwrap();
1291
1292 wal.begin_txn(2).unwrap();
1294 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1295 wal.force_flush().unwrap();
1296
1297 let (dirty_pages, active_txns, _) = wal.analysis_pass().unwrap();
1299
1300 assert!(!active_txns.contains_key(&1)); assert!(active_txns.contains_key(&2)); assert!(dirty_pages.contains_key(&200)); }
1304
1305 struct TestRecoveryHandler {
1306 redo_count: AtomicU64,
1307 undo_count: AtomicU64,
1308 }
1309
1310 impl RecoveryHandler for TestRecoveryHandler {
1311 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1312 self.redo_count.fetch_add(1, Ordering::Relaxed);
1313 Ok(())
1314 }
1315 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1316 self.undo_count.fetch_add(1, Ordering::Relaxed);
1317 Ok(())
1318 }
1319 }
1320
1321 #[test]
1322 #[ignore] fn test_wal_full_recovery() {
1324 let dir = TempDir::new().unwrap();
1325 let config = WalConfig {
1326 sync_mode: SyncMode::None,
1327 ..Default::default()
1328 };
1329
1330 {
1332 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1333
1334 wal.begin_txn(1).unwrap();
1336 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1337 wal.commit_txn(1).unwrap();
1338
1339 wal.begin_txn(2).unwrap();
1341 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1342 wal.log_update(2, 201, 0, vec![9, 10], vec![11, 12])
1343 .unwrap();
1344 wal.force_flush().unwrap();
1345 }
1347
1348 {
1350 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1351 let mut handler = TestRecoveryHandler {
1352 redo_count: AtomicU64::new(0),
1353 undo_count: AtomicU64::new(0),
1354 };
1355
1356 let stats = wal.recover(&mut handler).unwrap();
1357
1358 assert_eq!(stats.redo_records, 3);
1360
1361 assert_eq!(stats.undo_records, 2);
1363
1364 assert_eq!(stats.active_txns, 1);
1366 }
1367 }
1368
1369 #[test]
1370 #[ignore]
1371 fn test_validity_bitmap() {
1372 let mut bitmap = ValidityBitmap::new_all_valid(100);
1373 assert_eq!(bitmap.len(), 100);
1374 assert_eq!(bitmap.null_count(), 0);
1375
1376 for i in 0..100 {
1377 assert!(bitmap.is_valid(i));
1378 }
1379
1380 bitmap.set_null(10);
1382 bitmap.set_null(50);
1383 bitmap.set_null(99);
1384
1385 assert_eq!(bitmap.null_count(), 3);
1386 assert!(!bitmap.is_valid(10));
1387 assert!(!bitmap.is_valid(50));
1388 assert!(!bitmap.is_valid(99));
1389 assert!(bitmap.is_valid(11));
1390 }
1391}