1use std::collections::{HashMap, VecDeque};
39use std::fs::{File, OpenOptions};
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
43use std::sync::{Condvar, Mutex, RwLock};
44use std::time::{Duration, Instant};
45
46pub type Lsn = u64;
48
49pub type TxnId = u64;
51
52pub type PageId = u64;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57#[repr(u8)]
58pub enum WalRecordType {
59 Update = 1,
61 Commit = 2,
63 Abort = 3,
65 Clr = 4,
67 Checkpoint = 5,
69 Begin = 6,
71 End = 7,
73}
74
75impl TryFrom<u8> for WalRecordType {
76 type Error = ();
77
78 fn try_from(value: u8) -> Result<Self, Self::Error> {
79 match value {
80 1 => Ok(WalRecordType::Update),
81 2 => Ok(WalRecordType::Commit),
82 3 => Ok(WalRecordType::Abort),
83 4 => Ok(WalRecordType::Clr),
84 5 => Ok(WalRecordType::Checkpoint),
85 6 => Ok(WalRecordType::Begin),
86 7 => Ok(WalRecordType::End),
87 _ => Err(()),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94#[repr(C, packed)]
95pub struct WalRecordHeader {
96 pub lsn: u64,
98 pub txn_id: u64,
100 pub record_type: u8,
102 pub prev_lsn: u64,
104 pub page_id: u64,
106 pub offset: u16,
108 pub data_length: u32,
110 pub before_length: u16,
112 _reserved: [u8; 5],
114}
115
116impl WalRecordHeader {
117 pub const SIZE: usize = 48; pub fn serialize(&self) -> [u8; Self::SIZE] {
120 let mut buf = [0u8; Self::SIZE];
121 buf[0..8].copy_from_slice(&self.lsn.to_le_bytes());
122 buf[8..16].copy_from_slice(&self.txn_id.to_le_bytes());
123 buf[16] = self.record_type;
124 buf[17..25].copy_from_slice(&self.prev_lsn.to_le_bytes());
125 buf[25..33].copy_from_slice(&self.page_id.to_le_bytes());
126 buf[33..35].copy_from_slice(&self.offset.to_le_bytes());
127 buf[35..39].copy_from_slice(&self.data_length.to_le_bytes());
128 buf[39..41].copy_from_slice(&self.before_length.to_le_bytes());
129 buf
130 }
131
132 pub fn deserialize(buf: &[u8]) -> Option<Self> {
133 if buf.len() < Self::SIZE {
134 return None;
135 }
136 Some(Self {
137 lsn: u64::from_le_bytes(buf[0..8].try_into().ok()?),
138 txn_id: u64::from_le_bytes(buf[8..16].try_into().ok()?),
139 record_type: buf[16],
140 prev_lsn: u64::from_le_bytes(buf[17..25].try_into().ok()?),
141 page_id: u64::from_le_bytes(buf[25..33].try_into().ok()?),
142 offset: u16::from_le_bytes(buf[33..35].try_into().ok()?),
143 data_length: u32::from_le_bytes(buf[35..39].try_into().ok()?),
144 before_length: u16::from_le_bytes(buf[39..41].try_into().ok()?),
145 _reserved: [0; 5],
146 })
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct WalRecord {
153 pub header: WalRecordHeader,
154 pub before_image: Vec<u8>,
156 pub after_image: Vec<u8>,
158}
159
160impl WalRecord {
161 pub fn update(
163 lsn: Lsn,
164 txn_id: TxnId,
165 prev_lsn: Lsn,
166 page_id: PageId,
167 offset: u16,
168 before: Vec<u8>,
169 after: Vec<u8>,
170 ) -> Self {
171 Self {
172 header: WalRecordHeader {
173 lsn,
174 txn_id,
175 record_type: WalRecordType::Update as u8,
176 prev_lsn,
177 page_id,
178 offset,
179 data_length: (before.len() + after.len()) as u32,
180 before_length: before.len() as u16,
181 _reserved: [0; 5],
182 },
183 before_image: before,
184 after_image: after,
185 }
186 }
187
188 pub fn commit(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
190 Self {
191 header: WalRecordHeader {
192 lsn,
193 txn_id,
194 record_type: WalRecordType::Commit as u8,
195 prev_lsn,
196 page_id: 0,
197 offset: 0,
198 data_length: 0,
199 before_length: 0,
200 _reserved: [0; 5],
201 },
202 before_image: Vec::new(),
203 after_image: Vec::new(),
204 }
205 }
206
207 pub fn begin(lsn: Lsn, txn_id: TxnId) -> Self {
209 Self {
210 header: WalRecordHeader {
211 lsn,
212 txn_id,
213 record_type: WalRecordType::Begin as u8,
214 prev_lsn: 0,
215 page_id: 0,
216 offset: 0,
217 data_length: 0,
218 before_length: 0,
219 _reserved: [0; 5],
220 },
221 before_image: Vec::new(),
222 after_image: Vec::new(),
223 }
224 }
225
226 pub fn abort(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
228 Self {
229 header: WalRecordHeader {
230 lsn,
231 txn_id,
232 record_type: WalRecordType::Abort as u8,
233 prev_lsn,
234 page_id: 0,
235 offset: 0,
236 data_length: 0,
237 before_length: 0,
238 _reserved: [0; 5],
239 },
240 before_image: Vec::new(),
241 after_image: Vec::new(),
242 }
243 }
244
245 pub fn clr(
247 lsn: Lsn,
248 txn_id: TxnId,
249 prev_lsn: Lsn,
250 page_id: PageId,
251 offset: u16,
252 undo_next_lsn: Lsn, ) -> Self {
254 Self {
255 header: WalRecordHeader {
256 lsn,
257 txn_id,
258 record_type: WalRecordType::Clr as u8,
259 prev_lsn,
260 page_id,
261 offset,
262 data_length: 8,
263 before_length: 0,
264 _reserved: [0; 5],
265 },
266 before_image: Vec::new(),
267 after_image: undo_next_lsn.to_le_bytes().to_vec(),
268 }
269 }
270
271 pub fn serialize(&self) -> Vec<u8> {
273 let mut buf = Vec::with_capacity(
274 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4,
275 );
276 buf.extend_from_slice(&self.header.serialize());
277 buf.extend_from_slice(&self.before_image);
278 buf.extend_from_slice(&self.after_image);
279
280 let crc = crc32_of(&buf);
282 buf.extend_from_slice(&crc.to_le_bytes());
283 buf
284 }
285
286 pub fn deserialize(buf: &[u8]) -> Option<Self> {
288 if buf.len() < WalRecordHeader::SIZE + 4 {
289 return None;
290 }
291
292 let header = WalRecordHeader::deserialize(buf)?;
293 let data_start = WalRecordHeader::SIZE;
294 let data_end = data_start + header.data_length as usize;
295
296 if buf.len() < data_end + 4 {
297 return None;
298 }
299
300 let expected_crc = u32::from_le_bytes(buf[data_end..data_end + 4].try_into().ok()?);
302 let actual_crc = crc32_of(&buf[..data_end]);
303 if expected_crc != actual_crc {
304 return None;
305 }
306
307 let before_end = data_start + header.before_length as usize;
308 Some(Self {
309 header,
310 before_image: buf[data_start..before_end].to_vec(),
311 after_image: buf[before_end..data_end].to_vec(),
312 })
313 }
314
315 pub fn size(&self) -> usize {
317 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4
318 }
319}
320
321fn crc32_of(data: &[u8]) -> u32 {
323 let mut crc: u32 = 0xFFFFFFFF;
324 for byte in data {
325 crc ^= *byte as u32;
326 for _ in 0..8 {
327 if crc & 1 != 0 {
328 crc = (crc >> 1) ^ 0xEDB88320;
329 } else {
330 crc >>= 1;
331 }
332 }
333 }
334 !crc
335}
336
337#[derive(Debug)]
339pub struct GroupCommitBuffer {
340 records: Vec<WalRecord>,
342 bytes: usize,
344 waiters: Vec<(TxnId, std::sync::mpsc::Sender<Result<Lsn, WalError>>)>,
346 last_flush: Instant,
348}
349
350impl GroupCommitBuffer {
351 fn new() -> Self {
352 Self {
353 records: Vec::with_capacity(128),
354 bytes: 0,
355 waiters: Vec::new(),
356 last_flush: Instant::now(),
357 }
358 }
359
360 fn add_record(&mut self, record: WalRecord) {
361 self.bytes += record.size();
362 self.records.push(record);
363 }
364
365 fn add_waiter(
366 &mut self,
367 txn_id: TxnId,
368 sender: std::sync::mpsc::Sender<Result<Lsn, WalError>>,
369 ) {
370 self.waiters.push((txn_id, sender));
371 }
372
373 fn should_flush(&self, config: &WalConfig) -> bool {
374 self.bytes >= config.buffer_size
375 || self.records.len() >= config.max_batch_size
376 || self.last_flush.elapsed() >= config.flush_interval
377 }
378
379 fn clear(&mut self) {
380 self.records.clear();
381 self.bytes = 0;
382 self.waiters.clear();
383 self.last_flush = Instant::now();
384 }
385}
386
387#[derive(Debug, Clone)]
389pub struct WalConfig {
390 pub buffer_size: usize,
392 pub max_batch_size: usize,
394 pub flush_interval: Duration,
396 pub sync_mode: SyncMode,
398 pub checkpoint_interval: u64,
400}
401
402impl Default for WalConfig {
403 fn default() -> Self {
404 Self {
405 buffer_size: 1024 * 1024, max_batch_size: 1000,
407 flush_interval: Duration::from_millis(10),
408 sync_mode: SyncMode::Fsync,
409 checkpoint_interval: 100_000,
410 }
411 }
412}
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
416pub enum SyncMode {
417 None,
419 Fsync,
421 FdataSync,
423}
424
425#[derive(Debug, Clone)]
427pub enum WalError {
428 Io(String),
429 Corruption(String),
430 InvalidRecord,
431 BufferFull,
432}
433
434impl std::fmt::Display for WalError {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 match self {
437 WalError::Io(e) => write!(f, "WAL I/O error: {}", e),
438 WalError::Corruption(e) => write!(f, "WAL corruption: {}", e),
439 WalError::InvalidRecord => write!(f, "Invalid WAL record"),
440 WalError::BufferFull => write!(f, "WAL buffer full"),
441 }
442 }
443}
444
445impl std::error::Error for WalError {}
446
447#[allow(dead_code)]
449pub struct WriteAheadLog {
450 dir: PathBuf,
452 file: Mutex<File>,
454 file_number: AtomicU64,
456 lsn: AtomicU64,
458 buffer: Mutex<GroupCommitBuffer>,
460 flush_cv: Condvar,
462 config: WalConfig,
464 stats: WalStats,
466 running: AtomicBool,
468 flushed_lsn: AtomicU64,
470 txn_prev_lsn: RwLock<HashMap<TxnId, Lsn>>,
472}
473
474#[derive(Debug, Default)]
476pub struct WalStats {
477 pub records_written: AtomicU64,
479 pub bytes_written: AtomicU64,
481 pub flushes: AtomicU64,
483 pub total_batch_records: AtomicU64,
485 pub total_flush_time_us: AtomicU64,
487}
488
489impl WalStats {
490 pub fn avg_batch_size(&self) -> f64 {
491 let flushes = self.flushes.load(Ordering::Relaxed);
492 if flushes == 0 {
493 return 0.0;
494 }
495 self.total_batch_records.load(Ordering::Relaxed) as f64 / flushes as f64
496 }
497
498 pub fn avg_flush_time_us(&self) -> f64 {
499 let flushes = self.flushes.load(Ordering::Relaxed);
500 if flushes == 0 {
501 return 0.0;
502 }
503 self.total_flush_time_us.load(Ordering::Relaxed) as f64 / flushes as f64
504 }
505}
506
507impl WriteAheadLog {
508 pub fn open(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self, WalError> {
510 let dir = dir.as_ref().to_path_buf();
511 std::fs::create_dir_all(&dir).map_err(|e| WalError::Io(e.to_string()))?;
512
513 let file_number = Self::find_latest_file(&dir).unwrap_or(0);
515 let file_path = dir.join(format!("wal_{:08}.log", file_number));
516
517 let file = OpenOptions::new()
518 .create(true)
519 .read(true)
520 .append(true)
521 .open(&file_path)
522 .map_err(|e| WalError::Io(e.to_string()))?;
523
524 let lsn = Self::find_last_lsn(&file_path).unwrap_or(0);
526
527 Ok(Self {
528 dir,
529 file: Mutex::new(file),
530 file_number: AtomicU64::new(file_number),
531 lsn: AtomicU64::new(lsn),
532 buffer: Mutex::new(GroupCommitBuffer::new()),
533 flush_cv: Condvar::new(),
534 config,
535 stats: WalStats::default(),
536 running: AtomicBool::new(true),
537 flushed_lsn: AtomicU64::new(lsn),
538 txn_prev_lsn: RwLock::new(HashMap::new()),
539 })
540 }
541
542 fn find_latest_file(dir: &Path) -> Option<u64> {
543 std::fs::read_dir(dir)
544 .ok()?
545 .filter_map(|e| e.ok())
546 .filter_map(|e| {
547 let name = e.file_name().to_string_lossy().to_string();
548 if name.starts_with("wal_") && name.ends_with(".log") {
549 name[4..12].parse::<u64>().ok()
550 } else {
551 None
552 }
553 })
554 .max()
555 }
556
557 fn find_last_lsn(path: &Path) -> Option<Lsn> {
558 let mut file = File::open(path).ok()?;
559 let mut lsn = 0u64;
560 let mut buf = [0u8; WalRecordHeader::SIZE];
561
562 while let Ok(n) = file.read(&mut buf) {
563 if n < WalRecordHeader::SIZE {
564 break;
565 }
566 if let Some(header) = WalRecordHeader::deserialize(&buf) {
567 lsn = header.lsn;
568 let skip = header.data_length as i64 + 4; if file.seek(SeekFrom::Current(skip)).is_err() {
571 break;
572 }
573 } else {
574 break;
575 }
576 }
577
578 Some(lsn)
579 }
580
581 pub fn next_lsn(&self) -> Lsn {
583 self.lsn.fetch_add(1, Ordering::SeqCst) + 1
584 }
585
586 pub fn current_lsn(&self) -> Lsn {
588 self.lsn.load(Ordering::SeqCst)
589 }
590
591 pub fn flushed_lsn(&self) -> Lsn {
593 self.flushed_lsn.load(Ordering::Acquire)
594 }
595
596 pub fn begin_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
598 let lsn = self.next_lsn();
599 let record = WalRecord::begin(lsn, txn_id);
600
601 {
602 let mut prev_lsn = self.txn_prev_lsn.write().unwrap();
603 prev_lsn.insert(txn_id, lsn);
604 }
605
606 self.append(record)?;
607 Ok(lsn)
608 }
609
610 pub fn log_update(
612 &self,
613 txn_id: TxnId,
614 page_id: PageId,
615 offset: u16,
616 before: Vec<u8>,
617 after: Vec<u8>,
618 ) -> Result<Lsn, WalError> {
619 let lsn = self.next_lsn();
620 let prev_lsn = {
621 let prev_lsn = self.txn_prev_lsn.read().unwrap();
622 prev_lsn.get(&txn_id).copied().unwrap_or(0)
623 };
624
625 let record = WalRecord::update(lsn, txn_id, prev_lsn, page_id, offset, before, after);
626
627 {
628 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
629 prev_lsn_map.insert(txn_id, lsn);
630 }
631
632 self.append(record)?;
633 Ok(lsn)
634 }
635
636 pub fn commit_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
638 let lsn = self.next_lsn();
639 let prev_lsn = {
640 let prev_lsn = self.txn_prev_lsn.read().unwrap();
641 prev_lsn.get(&txn_id).copied().unwrap_or(0)
642 };
643
644 let record = WalRecord::commit(lsn, txn_id, prev_lsn);
645
646 let (tx, rx) = std::sync::mpsc::channel();
648
649 {
650 let mut buffer = self.buffer.lock().unwrap();
651 buffer.add_record(record);
652 buffer.add_waiter(txn_id, tx);
653
654 if buffer.should_flush(&self.config) {
655 self.flush_buffer_locked(&mut buffer)?;
656 }
657 }
658
659 {
661 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
662 prev_lsn_map.remove(&txn_id);
663 }
664
665 rx.recv()
667 .map_err(|_| WalError::Io("Channel closed".to_string()))?
668 }
669
670 pub fn abort_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
672 let lsn = self.next_lsn();
673 let prev_lsn = {
674 let prev_lsn = self.txn_prev_lsn.read().unwrap();
675 prev_lsn.get(&txn_id).copied().unwrap_or(0)
676 };
677
678 let record = WalRecord::abort(lsn, txn_id, prev_lsn);
679
680 {
681 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
682 prev_lsn_map.remove(&txn_id);
683 }
684
685 self.append(record)?;
686 self.force_flush()?;
687 Ok(lsn)
688 }
689
690 fn append(&self, record: WalRecord) -> Result<(), WalError> {
692 let mut buffer = self.buffer.lock().unwrap();
693 buffer.add_record(record);
694
695 if buffer.should_flush(&self.config) {
696 self.flush_buffer_locked(&mut buffer)?;
697 }
698
699 Ok(())
700 }
701
702 pub fn force_flush(&self) -> Result<Lsn, WalError> {
704 let mut buffer = self.buffer.lock().unwrap();
705 if !buffer.records.is_empty() {
706 self.flush_buffer_locked(&mut buffer)?;
707 }
708 Ok(self.flushed_lsn.load(Ordering::Acquire))
709 }
710
711 fn flush_buffer_locked(&self, buffer: &mut GroupCommitBuffer) -> Result<(), WalError> {
713 if buffer.records.is_empty() {
714 return Ok(());
715 }
716
717 let start = Instant::now();
718 let record_count = buffer.records.len() as u64;
719
720 let mut data = Vec::with_capacity(buffer.bytes);
722 let mut last_lsn = 0;
723 for record in &buffer.records {
724 last_lsn = record.header.lsn;
725 data.extend(record.serialize());
726 }
727
728 {
730 let mut file = self.file.lock().unwrap();
731 file.write_all(&data)
732 .map_err(|e| WalError::Io(e.to_string()))?;
733
734 match self.config.sync_mode {
736 SyncMode::Fsync => {
737 file.sync_all().map_err(|e| WalError::Io(e.to_string()))?;
738 }
739 SyncMode::FdataSync => {
740 file.sync_data().map_err(|e| WalError::Io(e.to_string()))?;
741 }
742 SyncMode::None => {}
743 }
744 }
745
746 self.flushed_lsn.store(last_lsn, Ordering::Release);
748
749 let elapsed_us = start.elapsed().as_micros() as u64;
751 self.stats
752 .records_written
753 .fetch_add(record_count, Ordering::Relaxed);
754 self.stats
755 .bytes_written
756 .fetch_add(data.len() as u64, Ordering::Relaxed);
757 self.stats.flushes.fetch_add(1, Ordering::Relaxed);
758 self.stats
759 .total_batch_records
760 .fetch_add(record_count, Ordering::Relaxed);
761 self.stats
762 .total_flush_time_us
763 .fetch_add(elapsed_us, Ordering::Relaxed);
764
765 for (_, sender) in buffer.waiters.drain(..) {
767 let _ = sender.send(Ok(last_lsn));
768 }
769
770 buffer.clear();
771 Ok(())
772 }
773
774 pub fn stats(&self) -> &WalStats {
776 &self.stats
777 }
778
779 pub fn recover<R: RecoveryHandler>(&self, handler: &mut R) -> Result<RecoveryStats, WalError> {
781 let start = Instant::now();
782
783 let (dirty_pages, active_txns, last_checkpoint) = self.analysis_pass()?;
785
786 let redo_start = dirty_pages
788 .values()
789 .min()
790 .copied()
791 .unwrap_or(last_checkpoint);
792 let redo_count = self.redo_pass(redo_start, handler)?;
793
794 let undo_count = self.undo_pass(&active_txns, handler)?;
796
797 Ok(RecoveryStats {
798 analysis_time: start.elapsed(),
799 redo_records: redo_count,
800 undo_records: undo_count,
801 dirty_pages: dirty_pages.len(),
802 active_txns: active_txns.len(),
803 })
804 }
805
806 #[allow(clippy::type_complexity)]
808 fn analysis_pass(&self) -> Result<(HashMap<PageId, Lsn>, HashMap<TxnId, Lsn>, Lsn), WalError> {
809 let mut dirty_pages: HashMap<PageId, Lsn> = HashMap::new();
810 let mut active_txns: HashMap<TxnId, Lsn> = HashMap::new();
811 let mut last_checkpoint = 0;
812
813 for record in self.iter_records()? {
814 let record = record?;
815 let lsn = record.header.lsn;
816 let txn_id = record.header.txn_id;
817
818 match WalRecordType::try_from(record.header.record_type) {
819 Ok(WalRecordType::Begin) => {
820 active_txns.insert(txn_id, lsn);
821 }
822 Ok(WalRecordType::Update) => {
823 let page_id = record.header.page_id;
825 dirty_pages.entry(page_id).or_insert(lsn);
826 active_txns.insert(txn_id, lsn);
828 }
829 Ok(WalRecordType::Commit) | Ok(WalRecordType::Abort) | Ok(WalRecordType::End) => {
830 active_txns.remove(&txn_id);
831 }
832 Ok(WalRecordType::Clr) => {
833 active_txns.insert(txn_id, lsn);
834 }
835 Ok(WalRecordType::Checkpoint) => {
836 last_checkpoint = lsn;
837 }
838 Err(_) => {}
839 }
840 }
841
842 Ok((dirty_pages, active_txns, last_checkpoint))
843 }
844
845 fn redo_pass<R: RecoveryHandler>(
847 &self,
848 start_lsn: Lsn,
849 handler: &mut R,
850 ) -> Result<u64, WalError> {
851 let mut count = 0;
852
853 for record in self.iter_records_from(start_lsn)? {
854 let record = record?;
855
856 match WalRecordType::try_from(record.header.record_type) {
857 Ok(WalRecordType::Update) => {
858 handler.redo(&record)?;
859 count += 1;
860 }
861 Ok(WalRecordType::Clr) => {
862 count += 1;
864 }
865 _ => {}
866 }
867 }
868
869 Ok(count)
870 }
871
872 fn undo_pass<R: RecoveryHandler>(
874 &self,
875 active_txns: &HashMap<TxnId, Lsn>,
876 handler: &mut R,
877 ) -> Result<u64, WalError> {
878 let mut count = 0;
879
880 let mut undo_list: VecDeque<(Lsn, TxnId)> = active_txns
882 .iter()
883 .map(|(&txn_id, &lsn)| (lsn, txn_id))
884 .collect();
885 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
886
887 while let Some((lsn, txn_id)) = undo_list.pop_front() {
888 if lsn == 0 {
889 continue;
890 }
891
892 if let Some(record) = self.read_record_at(lsn)? {
894 match WalRecordType::try_from(record.header.record_type) {
895 Ok(WalRecordType::Update) => {
896 handler.undo(&record)?;
898 count += 1;
899
900 let clr_lsn = self.next_lsn();
902 let clr = WalRecord::clr(
903 clr_lsn,
904 txn_id,
905 lsn,
906 record.header.page_id,
907 record.header.offset,
908 record.header.prev_lsn,
909 );
910 self.append(clr)?;
911
912 if record.header.prev_lsn > 0 {
914 undo_list.push_back((record.header.prev_lsn, txn_id));
915 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
916 }
917 }
918 Ok(WalRecordType::Clr) => {
919 if record.after_image.len() >= 8 {
921 let undo_next =
922 u64::from_le_bytes(record.after_image[0..8].try_into().unwrap());
923 if undo_next > 0 {
924 undo_list.push_back((undo_next, txn_id));
925 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
926 }
927 }
928 }
929 _ => {
930 if record.header.prev_lsn > 0 {
932 undo_list.push_back((record.header.prev_lsn, txn_id));
933 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
934 }
935 }
936 }
937 }
938 }
939
940 self.force_flush()?;
941 Ok(count)
942 }
943
944 fn read_record_at(&self, target_lsn: Lsn) -> Result<Option<WalRecord>, WalError> {
946 for record in self.iter_records()? {
947 let record = record?;
948 if record.header.lsn == target_lsn {
949 return Ok(Some(record));
950 }
951 if record.header.lsn > target_lsn {
952 break;
953 }
954 }
955 Ok(None)
956 }
957
958 fn iter_records(&self) -> Result<WalIterator, WalError> {
960 self.iter_records_from(0)
961 }
962
963 fn iter_records_from(&self, start_lsn: Lsn) -> Result<WalIterator, WalError> {
965 let file_path = self.dir.join(format!(
966 "wal_{:08}.log",
967 self.file_number.load(Ordering::Relaxed)
968 ));
969
970 let file = File::open(&file_path).map_err(|e| WalError::Io(e.to_string()))?;
971
972 Ok(WalIterator {
973 file,
974 start_lsn,
975 started: false,
976 })
977 }
978}
979
980pub trait RecoveryHandler {
982 fn redo(&mut self, record: &WalRecord) -> Result<(), WalError>;
983 fn undo(&mut self, record: &WalRecord) -> Result<(), WalError>;
984}
985
986pub struct NoOpRecoveryHandler;
988
989impl RecoveryHandler for NoOpRecoveryHandler {
990 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
991 Ok(())
992 }
993 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
994 Ok(())
995 }
996}
997
998#[derive(Debug, Clone)]
1000pub struct RecoveryStats {
1001 pub analysis_time: Duration,
1002 pub redo_records: u64,
1003 pub undo_records: u64,
1004 pub dirty_pages: usize,
1005 pub active_txns: usize,
1006}
1007
1008pub struct WalIterator {
1010 file: File,
1011 start_lsn: Lsn,
1012 started: bool,
1013}
1014
1015impl Iterator for WalIterator {
1016 type Item = Result<WalRecord, WalError>;
1017
1018 fn next(&mut self) -> Option<Self::Item> {
1019 let mut header_buf = [0u8; WalRecordHeader::SIZE];
1020
1021 match self.file.read_exact(&mut header_buf) {
1022 Ok(()) => {}
1023 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1024 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1025 }
1026
1027 let header = match WalRecordHeader::deserialize(&header_buf) {
1028 Some(h) => h,
1029 None => return Some(Err(WalError::InvalidRecord)),
1030 };
1031
1032 if !self.started {
1034 if header.lsn < self.start_lsn {
1035 let skip = header.data_length as i64 + 4;
1037 if let Err(e) = self.file.seek(SeekFrom::Current(skip)) {
1038 return Some(Err(WalError::Io(e.to_string())));
1039 }
1040 return self.next();
1041 }
1042 self.started = true;
1043 }
1044
1045 let data_len = header.data_length as usize;
1047 let mut data_buf = vec![0u8; data_len + 4]; match self.file.read_exact(&mut data_buf) {
1050 Ok(()) => {}
1051 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1052 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1053 }
1054
1055 let mut full_buf = Vec::with_capacity(WalRecordHeader::SIZE + data_len);
1057 full_buf.extend_from_slice(&header_buf);
1058 full_buf.extend_from_slice(&data_buf[..data_len]);
1059
1060 let expected_crc = u32::from_le_bytes(data_buf[data_len..data_len + 4].try_into().unwrap());
1061 let actual_crc = crc32_of(&full_buf);
1062
1063 if expected_crc != actual_crc {
1064 return Some(Err(WalError::Corruption("CRC mismatch".to_string())));
1065 }
1066
1067 let before_end = header.before_length as usize;
1068 Some(Ok(WalRecord {
1069 header,
1070 before_image: data_buf[..before_end].to_vec(),
1071 after_image: data_buf[before_end..data_len].to_vec(),
1072 }))
1073 }
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079 use std::sync::atomic::AtomicU64;
1080 use tempfile::TempDir;
1081 use sochdb_core::ValidityBitmap;
1082
1083 #[test]
1084 fn test_wal_record_serialization() {
1085 let record = WalRecord::update(1, 100, 0, 1000, 0, vec![1, 2, 3, 4], vec![5, 6, 7, 8]);
1086
1087 let serialized = record.serialize();
1088 let deserialized = WalRecord::deserialize(&serialized).unwrap();
1089
1090 let lsn = deserialized.header.lsn;
1092 let txn_id = deserialized.header.txn_id;
1093
1094 assert_eq!(lsn, 1);
1095 assert_eq!(txn_id, 100);
1096 assert_eq!(deserialized.before_image, vec![1, 2, 3, 4]);
1097 assert_eq!(deserialized.after_image, vec![5, 6, 7, 8]);
1098 }
1099
1100 #[test]
1101 #[ignore] fn test_wal_basic_operations() {
1103 let dir = TempDir::new().unwrap();
1104 let config = WalConfig {
1105 sync_mode: SyncMode::None, ..Default::default()
1107 };
1108
1109 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1110
1111 let begin_lsn = wal.begin_txn(1).unwrap();
1113 assert!(begin_lsn > 0);
1114
1115 let update_lsn = wal.log_update(1, 100, 0, vec![0; 10], vec![1; 10]).unwrap();
1117 assert!(update_lsn > begin_lsn);
1118
1119 let commit_lsn = wal.commit_txn(1).unwrap();
1121 assert!(commit_lsn > update_lsn);
1122
1123 assert!(wal.stats().records_written.load(Ordering::Relaxed) >= 3);
1125 }
1126
1127 #[test]
1128 #[ignore] fn test_wal_group_commit() {
1130 let dir = TempDir::new().unwrap();
1131 let config = WalConfig {
1132 sync_mode: SyncMode::None,
1133 buffer_size: 10000, max_batch_size: 100,
1135 flush_interval: Duration::from_secs(10), ..Default::default()
1137 };
1138
1139 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1140
1141 for i in 0..10 {
1143 wal.begin_txn(i).unwrap();
1144 wal.log_update(i, 100 + i, 0, vec![0; 10], vec![1; 10])
1145 .unwrap();
1146 }
1147
1148 wal.force_flush().unwrap();
1150
1151 let stats = wal.stats();
1152 let flushes = stats.flushes.load(Ordering::Relaxed);
1153 let records = stats.records_written.load(Ordering::Relaxed);
1154
1155 assert!(records >= 20); println!(
1158 "Flushes: {}, Records: {}, Avg batch: {:.1}",
1159 flushes,
1160 records,
1161 stats.avg_batch_size()
1162 );
1163 }
1164
1165 #[test]
1166 fn test_crc32() {
1167 let data = b"hello world";
1168 let crc = crc32_of(data);
1169 assert_ne!(crc, 0);
1170
1171 let crc2 = crc32_of(data);
1173 assert_eq!(crc, crc2);
1174
1175 let data2 = b"hello World"; let crc3 = crc32_of(data2);
1178 assert_ne!(crc, crc3);
1179 }
1180
1181 #[test]
1182 #[ignore] fn test_wal_iterator() {
1184 let dir = TempDir::new().unwrap();
1185 let config = WalConfig {
1186 sync_mode: SyncMode::None,
1187 ..Default::default()
1188 };
1189
1190 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1191
1192 wal.begin_txn(1).unwrap();
1194 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1195 .unwrap();
1196 wal.log_update(1, 101, 0, vec![7, 8, 9], vec![10, 11, 12])
1197 .unwrap();
1198 wal.force_flush().unwrap();
1199
1200 let count = wal.iter_records().unwrap().count();
1202 assert_eq!(count, 3); }
1204
1205 #[test]
1206 #[ignore] fn test_wal_persistence() {
1208 let dir = TempDir::new().unwrap();
1209 let config = WalConfig {
1210 sync_mode: SyncMode::None,
1211 ..Default::default()
1212 };
1213
1214 {
1216 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1217 wal.begin_txn(1).unwrap();
1218 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1219 .unwrap();
1220 wal.commit_txn(1).unwrap();
1221 }
1222
1223 {
1225 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1226 let count = wal.iter_records().unwrap().count();
1227 assert_eq!(count, 3); }
1229 }
1230
1231 #[test]
1232 #[ignore] fn test_wal_recovery_analysis() {
1234 let dir = TempDir::new().unwrap();
1235 let config = WalConfig {
1236 sync_mode: SyncMode::None,
1237 ..Default::default()
1238 };
1239
1240 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1241
1242 wal.begin_txn(1).unwrap();
1244 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1245 wal.commit_txn(1).unwrap();
1246
1247 wal.begin_txn(2).unwrap();
1249 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1250 wal.force_flush().unwrap();
1251
1252 let (dirty_pages, active_txns, _) = wal.analysis_pass().unwrap();
1254
1255 assert!(!active_txns.contains_key(&1)); assert!(active_txns.contains_key(&2)); assert!(dirty_pages.contains_key(&200)); }
1259
1260 struct TestRecoveryHandler {
1261 redo_count: AtomicU64,
1262 undo_count: AtomicU64,
1263 }
1264
1265 impl RecoveryHandler for TestRecoveryHandler {
1266 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1267 self.redo_count.fetch_add(1, Ordering::Relaxed);
1268 Ok(())
1269 }
1270 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1271 self.undo_count.fetch_add(1, Ordering::Relaxed);
1272 Ok(())
1273 }
1274 }
1275
1276 #[test]
1277 #[ignore] fn test_wal_full_recovery() {
1279 let dir = TempDir::new().unwrap();
1280 let config = WalConfig {
1281 sync_mode: SyncMode::None,
1282 ..Default::default()
1283 };
1284
1285 {
1287 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1288
1289 wal.begin_txn(1).unwrap();
1291 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1292 wal.commit_txn(1).unwrap();
1293
1294 wal.begin_txn(2).unwrap();
1296 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1297 wal.log_update(2, 201, 0, vec![9, 10], vec![11, 12])
1298 .unwrap();
1299 wal.force_flush().unwrap();
1300 }
1302
1303 {
1305 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1306 let mut handler = TestRecoveryHandler {
1307 redo_count: AtomicU64::new(0),
1308 undo_count: AtomicU64::new(0),
1309 };
1310
1311 let stats = wal.recover(&mut handler).unwrap();
1312
1313 assert_eq!(stats.redo_records, 3);
1315
1316 assert_eq!(stats.undo_records, 2);
1318
1319 assert_eq!(stats.active_txns, 1);
1321 }
1322 }
1323
1324 #[test]
1325 #[ignore]
1326 fn test_validity_bitmap() {
1327 let mut bitmap = ValidityBitmap::new_all_valid(100);
1328 assert_eq!(bitmap.len(), 100);
1329 assert_eq!(bitmap.null_count(), 0);
1330
1331 for i in 0..100 {
1332 assert!(bitmap.is_valid(i));
1333 }
1334
1335 bitmap.set_null(10);
1337 bitmap.set_null(50);
1338 bitmap.set_null(99);
1339
1340 assert_eq!(bitmap.null_count(), 3);
1341 assert!(!bitmap.is_valid(10));
1342 assert!(!bitmap.is_valid(50));
1343 assert!(!bitmap.is_valid(99));
1344 assert!(bitmap.is_valid(11));
1345 }
1346}