1use std::collections::{HashMap, VecDeque};
36use std::fs::{File, OpenOptions};
37use std::io::{self, Read, Seek, SeekFrom, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{Condvar, Mutex, RwLock};
41use std::time::{Duration, Instant};
42
43pub type Lsn = u64;
45
46pub type TxnId = u64;
48
49pub type PageId = u64;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54#[repr(u8)]
55pub enum WalRecordType {
56 Update = 1,
58 Commit = 2,
60 Abort = 3,
62 Clr = 4,
64 Checkpoint = 5,
66 Begin = 6,
68 End = 7,
70}
71
72impl TryFrom<u8> for WalRecordType {
73 type Error = ();
74
75 fn try_from(value: u8) -> Result<Self, Self::Error> {
76 match value {
77 1 => Ok(WalRecordType::Update),
78 2 => Ok(WalRecordType::Commit),
79 3 => Ok(WalRecordType::Abort),
80 4 => Ok(WalRecordType::Clr),
81 5 => Ok(WalRecordType::Checkpoint),
82 6 => Ok(WalRecordType::Begin),
83 7 => Ok(WalRecordType::End),
84 _ => Err(()),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
91#[repr(C, packed)]
92pub struct WalRecordHeader {
93 pub lsn: u64,
95 pub txn_id: u64,
97 pub record_type: u8,
99 pub prev_lsn: u64,
101 pub page_id: u64,
103 pub offset: u16,
105 pub data_length: u32,
107 pub before_length: u16,
109 _reserved: [u8; 5],
111}
112
113impl WalRecordHeader {
114 pub const SIZE: usize = 48; pub fn serialize(&self) -> [u8; Self::SIZE] {
117 let mut buf = [0u8; Self::SIZE];
118 buf[0..8].copy_from_slice(&self.lsn.to_le_bytes());
119 buf[8..16].copy_from_slice(&self.txn_id.to_le_bytes());
120 buf[16] = self.record_type;
121 buf[17..25].copy_from_slice(&self.prev_lsn.to_le_bytes());
122 buf[25..33].copy_from_slice(&self.page_id.to_le_bytes());
123 buf[33..35].copy_from_slice(&self.offset.to_le_bytes());
124 buf[35..39].copy_from_slice(&self.data_length.to_le_bytes());
125 buf[39..41].copy_from_slice(&self.before_length.to_le_bytes());
126 buf
127 }
128
129 pub fn deserialize(buf: &[u8]) -> Option<Self> {
130 if buf.len() < Self::SIZE {
131 return None;
132 }
133 Some(Self {
134 lsn: u64::from_le_bytes(buf[0..8].try_into().ok()?),
135 txn_id: u64::from_le_bytes(buf[8..16].try_into().ok()?),
136 record_type: buf[16],
137 prev_lsn: u64::from_le_bytes(buf[17..25].try_into().ok()?),
138 page_id: u64::from_le_bytes(buf[25..33].try_into().ok()?),
139 offset: u16::from_le_bytes(buf[33..35].try_into().ok()?),
140 data_length: u32::from_le_bytes(buf[35..39].try_into().ok()?),
141 before_length: u16::from_le_bytes(buf[39..41].try_into().ok()?),
142 _reserved: [0; 5],
143 })
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct WalRecord {
150 pub header: WalRecordHeader,
151 pub before_image: Vec<u8>,
153 pub after_image: Vec<u8>,
155}
156
157impl WalRecord {
158 pub fn update(
160 lsn: Lsn,
161 txn_id: TxnId,
162 prev_lsn: Lsn,
163 page_id: PageId,
164 offset: u16,
165 before: Vec<u8>,
166 after: Vec<u8>,
167 ) -> Self {
168 Self {
169 header: WalRecordHeader {
170 lsn,
171 txn_id,
172 record_type: WalRecordType::Update as u8,
173 prev_lsn,
174 page_id,
175 offset,
176 data_length: (before.len() + after.len()) as u32,
177 before_length: before.len() as u16,
178 _reserved: [0; 5],
179 },
180 before_image: before,
181 after_image: after,
182 }
183 }
184
185 pub fn commit(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
187 Self {
188 header: WalRecordHeader {
189 lsn,
190 txn_id,
191 record_type: WalRecordType::Commit as u8,
192 prev_lsn,
193 page_id: 0,
194 offset: 0,
195 data_length: 0,
196 before_length: 0,
197 _reserved: [0; 5],
198 },
199 before_image: Vec::new(),
200 after_image: Vec::new(),
201 }
202 }
203
204 pub fn begin(lsn: Lsn, txn_id: TxnId) -> Self {
206 Self {
207 header: WalRecordHeader {
208 lsn,
209 txn_id,
210 record_type: WalRecordType::Begin as u8,
211 prev_lsn: 0,
212 page_id: 0,
213 offset: 0,
214 data_length: 0,
215 before_length: 0,
216 _reserved: [0; 5],
217 },
218 before_image: Vec::new(),
219 after_image: Vec::new(),
220 }
221 }
222
223 pub fn abort(lsn: Lsn, txn_id: TxnId, prev_lsn: Lsn) -> Self {
225 Self {
226 header: WalRecordHeader {
227 lsn,
228 txn_id,
229 record_type: WalRecordType::Abort as u8,
230 prev_lsn,
231 page_id: 0,
232 offset: 0,
233 data_length: 0,
234 before_length: 0,
235 _reserved: [0; 5],
236 },
237 before_image: Vec::new(),
238 after_image: Vec::new(),
239 }
240 }
241
242 pub fn clr(
244 lsn: Lsn,
245 txn_id: TxnId,
246 prev_lsn: Lsn,
247 page_id: PageId,
248 offset: u16,
249 undo_next_lsn: Lsn, ) -> Self {
251 Self {
252 header: WalRecordHeader {
253 lsn,
254 txn_id,
255 record_type: WalRecordType::Clr as u8,
256 prev_lsn,
257 page_id,
258 offset,
259 data_length: 8,
260 before_length: 0,
261 _reserved: [0; 5],
262 },
263 before_image: Vec::new(),
264 after_image: undo_next_lsn.to_le_bytes().to_vec(),
265 }
266 }
267
268 pub fn serialize(&self) -> Vec<u8> {
270 let mut buf = Vec::with_capacity(
271 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4,
272 );
273 buf.extend_from_slice(&self.header.serialize());
274 buf.extend_from_slice(&self.before_image);
275 buf.extend_from_slice(&self.after_image);
276
277 let crc = crc32_of(&buf);
279 buf.extend_from_slice(&crc.to_le_bytes());
280 buf
281 }
282
283 pub fn deserialize(buf: &[u8]) -> Option<Self> {
285 if buf.len() < WalRecordHeader::SIZE + 4 {
286 return None;
287 }
288
289 let header = WalRecordHeader::deserialize(buf)?;
290 let data_start = WalRecordHeader::SIZE;
291 let data_end = data_start + header.data_length as usize;
292
293 if buf.len() < data_end + 4 {
294 return None;
295 }
296
297 let expected_crc = u32::from_le_bytes(buf[data_end..data_end + 4].try_into().ok()?);
299 let actual_crc = crc32_of(&buf[..data_end]);
300 if expected_crc != actual_crc {
301 return None;
302 }
303
304 let before_end = data_start + header.before_length as usize;
305 Some(Self {
306 header,
307 before_image: buf[data_start..before_end].to_vec(),
308 after_image: buf[before_end..data_end].to_vec(),
309 })
310 }
311
312 pub fn size(&self) -> usize {
314 WalRecordHeader::SIZE + self.before_image.len() + self.after_image.len() + 4
315 }
316}
317
318fn crc32_of(data: &[u8]) -> u32 {
320 let mut crc: u32 = 0xFFFFFFFF;
321 for byte in data {
322 crc ^= *byte as u32;
323 for _ in 0..8 {
324 if crc & 1 != 0 {
325 crc = (crc >> 1) ^ 0xEDB88320;
326 } else {
327 crc >>= 1;
328 }
329 }
330 }
331 !crc
332}
333
334#[derive(Debug)]
336pub struct GroupCommitBuffer {
337 records: Vec<WalRecord>,
339 bytes: usize,
341 waiters: Vec<(TxnId, std::sync::mpsc::Sender<Result<Lsn, WalError>>)>,
343 last_flush: Instant,
345}
346
347impl GroupCommitBuffer {
348 fn new() -> Self {
349 Self {
350 records: Vec::with_capacity(128),
351 bytes: 0,
352 waiters: Vec::new(),
353 last_flush: Instant::now(),
354 }
355 }
356
357 fn add_record(&mut self, record: WalRecord) {
358 self.bytes += record.size();
359 self.records.push(record);
360 }
361
362 fn add_waiter(
363 &mut self,
364 txn_id: TxnId,
365 sender: std::sync::mpsc::Sender<Result<Lsn, WalError>>,
366 ) {
367 self.waiters.push((txn_id, sender));
368 }
369
370 fn should_flush(&self, config: &WalConfig) -> bool {
371 self.bytes >= config.buffer_size
372 || self.records.len() >= config.max_batch_size
373 || self.last_flush.elapsed() >= config.flush_interval
374 }
375
376 fn clear(&mut self) {
377 self.records.clear();
378 self.bytes = 0;
379 self.waiters.clear();
380 self.last_flush = Instant::now();
381 }
382}
383
384#[derive(Debug, Clone)]
386pub struct WalConfig {
387 pub buffer_size: usize,
389 pub max_batch_size: usize,
391 pub flush_interval: Duration,
393 pub sync_mode: SyncMode,
395 pub checkpoint_interval: u64,
397}
398
399impl Default for WalConfig {
400 fn default() -> Self {
401 Self {
402 buffer_size: 1024 * 1024, max_batch_size: 1000,
404 flush_interval: Duration::from_millis(10),
405 sync_mode: SyncMode::Fsync,
406 checkpoint_interval: 100_000,
407 }
408 }
409}
410
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum SyncMode {
414 None,
416 Fsync,
418 FdataSync,
420}
421
422#[derive(Debug, Clone)]
424pub enum WalError {
425 Io(String),
426 Corruption(String),
427 InvalidRecord,
428 BufferFull,
429}
430
431impl std::fmt::Display for WalError {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 match self {
434 WalError::Io(e) => write!(f, "WAL I/O error: {}", e),
435 WalError::Corruption(e) => write!(f, "WAL corruption: {}", e),
436 WalError::InvalidRecord => write!(f, "Invalid WAL record"),
437 WalError::BufferFull => write!(f, "WAL buffer full"),
438 }
439 }
440}
441
442impl std::error::Error for WalError {}
443
444#[allow(dead_code)]
446pub struct WriteAheadLog {
447 dir: PathBuf,
449 file: Mutex<File>,
451 file_number: AtomicU64,
453 lsn: AtomicU64,
455 buffer: Mutex<GroupCommitBuffer>,
457 flush_cv: Condvar,
459 config: WalConfig,
461 stats: WalStats,
463 running: AtomicBool,
465 flushed_lsn: AtomicU64,
467 txn_prev_lsn: RwLock<HashMap<TxnId, Lsn>>,
469}
470
471#[derive(Debug, Default)]
473pub struct WalStats {
474 pub records_written: AtomicU64,
476 pub bytes_written: AtomicU64,
478 pub flushes: AtomicU64,
480 pub total_batch_records: AtomicU64,
482 pub total_flush_time_us: AtomicU64,
484}
485
486impl WalStats {
487 pub fn avg_batch_size(&self) -> f64 {
488 let flushes = self.flushes.load(Ordering::Relaxed);
489 if flushes == 0 {
490 return 0.0;
491 }
492 self.total_batch_records.load(Ordering::Relaxed) as f64 / flushes as f64
493 }
494
495 pub fn avg_flush_time_us(&self) -> f64 {
496 let flushes = self.flushes.load(Ordering::Relaxed);
497 if flushes == 0 {
498 return 0.0;
499 }
500 self.total_flush_time_us.load(Ordering::Relaxed) as f64 / flushes as f64
501 }
502}
503
504impl WriteAheadLog {
505 pub fn open(dir: impl AsRef<Path>, config: WalConfig) -> Result<Self, WalError> {
507 let dir = dir.as_ref().to_path_buf();
508 std::fs::create_dir_all(&dir).map_err(|e| WalError::Io(e.to_string()))?;
509
510 let file_number = Self::find_latest_file(&dir).unwrap_or(0);
512 let file_path = dir.join(format!("wal_{:08}.log", file_number));
513
514 let file = OpenOptions::new()
515 .create(true)
516 .read(true)
517 .append(true)
518 .open(&file_path)
519 .map_err(|e| WalError::Io(e.to_string()))?;
520
521 let lsn = Self::find_last_lsn(&file_path).unwrap_or(0);
523
524 Ok(Self {
525 dir,
526 file: Mutex::new(file),
527 file_number: AtomicU64::new(file_number),
528 lsn: AtomicU64::new(lsn),
529 buffer: Mutex::new(GroupCommitBuffer::new()),
530 flush_cv: Condvar::new(),
531 config,
532 stats: WalStats::default(),
533 running: AtomicBool::new(true),
534 flushed_lsn: AtomicU64::new(lsn),
535 txn_prev_lsn: RwLock::new(HashMap::new()),
536 })
537 }
538
539 fn find_latest_file(dir: &Path) -> Option<u64> {
540 std::fs::read_dir(dir)
541 .ok()?
542 .filter_map(|e| e.ok())
543 .filter_map(|e| {
544 let name = e.file_name().to_string_lossy().to_string();
545 if name.starts_with("wal_") && name.ends_with(".log") {
546 name[4..12].parse::<u64>().ok()
547 } else {
548 None
549 }
550 })
551 .max()
552 }
553
554 fn find_last_lsn(path: &Path) -> Option<Lsn> {
555 let mut file = File::open(path).ok()?;
556 let mut lsn = 0u64;
557 let mut buf = [0u8; WalRecordHeader::SIZE];
558
559 while let Ok(n) = file.read(&mut buf) {
560 if n < WalRecordHeader::SIZE {
561 break;
562 }
563 if let Some(header) = WalRecordHeader::deserialize(&buf) {
564 lsn = header.lsn;
565 let skip = header.data_length as i64 + 4; if file.seek(SeekFrom::Current(skip)).is_err() {
568 break;
569 }
570 } else {
571 break;
572 }
573 }
574
575 Some(lsn)
576 }
577
578 pub fn next_lsn(&self) -> Lsn {
580 self.lsn.fetch_add(1, Ordering::SeqCst) + 1
581 }
582
583 pub fn current_lsn(&self) -> Lsn {
585 self.lsn.load(Ordering::SeqCst)
586 }
587
588 pub fn flushed_lsn(&self) -> Lsn {
590 self.flushed_lsn.load(Ordering::Acquire)
591 }
592
593 pub fn begin_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
595 let lsn = self.next_lsn();
596 let record = WalRecord::begin(lsn, txn_id);
597
598 {
599 let mut prev_lsn = self.txn_prev_lsn.write().unwrap();
600 prev_lsn.insert(txn_id, lsn);
601 }
602
603 self.append(record)?;
604 Ok(lsn)
605 }
606
607 pub fn log_update(
609 &self,
610 txn_id: TxnId,
611 page_id: PageId,
612 offset: u16,
613 before: Vec<u8>,
614 after: Vec<u8>,
615 ) -> Result<Lsn, WalError> {
616 let lsn = self.next_lsn();
617 let prev_lsn = {
618 let prev_lsn = self.txn_prev_lsn.read().unwrap();
619 prev_lsn.get(&txn_id).copied().unwrap_or(0)
620 };
621
622 let record = WalRecord::update(lsn, txn_id, prev_lsn, page_id, offset, before, after);
623
624 {
625 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
626 prev_lsn_map.insert(txn_id, lsn);
627 }
628
629 self.append(record)?;
630 Ok(lsn)
631 }
632
633 pub fn commit_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
635 let lsn = self.next_lsn();
636 let prev_lsn = {
637 let prev_lsn = self.txn_prev_lsn.read().unwrap();
638 prev_lsn.get(&txn_id).copied().unwrap_or(0)
639 };
640
641 let record = WalRecord::commit(lsn, txn_id, prev_lsn);
642
643 let (tx, rx) = std::sync::mpsc::channel();
645
646 {
647 let mut buffer = self.buffer.lock().unwrap();
648 buffer.add_record(record);
649 buffer.add_waiter(txn_id, tx);
650
651 if buffer.should_flush(&self.config) {
652 self.flush_buffer_locked(&mut buffer)?;
653 }
654 }
655
656 {
658 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
659 prev_lsn_map.remove(&txn_id);
660 }
661
662 rx.recv()
664 .map_err(|_| WalError::Io("Channel closed".to_string()))?
665 }
666
667 pub fn abort_txn(&self, txn_id: TxnId) -> Result<Lsn, WalError> {
669 let lsn = self.next_lsn();
670 let prev_lsn = {
671 let prev_lsn = self.txn_prev_lsn.read().unwrap();
672 prev_lsn.get(&txn_id).copied().unwrap_or(0)
673 };
674
675 let record = WalRecord::abort(lsn, txn_id, prev_lsn);
676
677 {
678 let mut prev_lsn_map = self.txn_prev_lsn.write().unwrap();
679 prev_lsn_map.remove(&txn_id);
680 }
681
682 self.append(record)?;
683 self.force_flush()?;
684 Ok(lsn)
685 }
686
687 fn append(&self, record: WalRecord) -> Result<(), WalError> {
689 let mut buffer = self.buffer.lock().unwrap();
690 buffer.add_record(record);
691
692 if buffer.should_flush(&self.config) {
693 self.flush_buffer_locked(&mut buffer)?;
694 }
695
696 Ok(())
697 }
698
699 pub fn force_flush(&self) -> Result<Lsn, WalError> {
701 let mut buffer = self.buffer.lock().unwrap();
702 if !buffer.records.is_empty() {
703 self.flush_buffer_locked(&mut buffer)?;
704 }
705 Ok(self.flushed_lsn.load(Ordering::Acquire))
706 }
707
708 fn flush_buffer_locked(&self, buffer: &mut GroupCommitBuffer) -> Result<(), WalError> {
710 if buffer.records.is_empty() {
711 return Ok(());
712 }
713
714 let start = Instant::now();
715 let record_count = buffer.records.len() as u64;
716
717 let mut data = Vec::with_capacity(buffer.bytes);
719 let mut last_lsn = 0;
720 for record in &buffer.records {
721 last_lsn = record.header.lsn;
722 data.extend(record.serialize());
723 }
724
725 {
727 let mut file = self.file.lock().unwrap();
728 file.write_all(&data)
729 .map_err(|e| WalError::Io(e.to_string()))?;
730
731 match self.config.sync_mode {
733 SyncMode::Fsync => {
734 file.sync_all().map_err(|e| WalError::Io(e.to_string()))?;
735 }
736 SyncMode::FdataSync => {
737 file.sync_data().map_err(|e| WalError::Io(e.to_string()))?;
738 }
739 SyncMode::None => {}
740 }
741 }
742
743 self.flushed_lsn.store(last_lsn, Ordering::Release);
745
746 let elapsed_us = start.elapsed().as_micros() as u64;
748 self.stats
749 .records_written
750 .fetch_add(record_count, Ordering::Relaxed);
751 self.stats
752 .bytes_written
753 .fetch_add(data.len() as u64, Ordering::Relaxed);
754 self.stats.flushes.fetch_add(1, Ordering::Relaxed);
755 self.stats
756 .total_batch_records
757 .fetch_add(record_count, Ordering::Relaxed);
758 self.stats
759 .total_flush_time_us
760 .fetch_add(elapsed_us, Ordering::Relaxed);
761
762 for (_, sender) in buffer.waiters.drain(..) {
764 let _ = sender.send(Ok(last_lsn));
765 }
766
767 buffer.clear();
768 Ok(())
769 }
770
771 pub fn stats(&self) -> &WalStats {
773 &self.stats
774 }
775
776 pub fn recover<R: RecoveryHandler>(&self, handler: &mut R) -> Result<RecoveryStats, WalError> {
778 let start = Instant::now();
779
780 let (dirty_pages, active_txns, last_checkpoint) = self.analysis_pass()?;
782
783 let redo_start = dirty_pages
785 .values()
786 .min()
787 .copied()
788 .unwrap_or(last_checkpoint);
789 let redo_count = self.redo_pass(redo_start, handler)?;
790
791 let undo_count = self.undo_pass(&active_txns, handler)?;
793
794 Ok(RecoveryStats {
795 analysis_time: start.elapsed(),
796 redo_records: redo_count,
797 undo_records: undo_count,
798 dirty_pages: dirty_pages.len(),
799 active_txns: active_txns.len(),
800 })
801 }
802
803 #[allow(clippy::type_complexity)]
805 fn analysis_pass(&self) -> Result<(HashMap<PageId, Lsn>, HashMap<TxnId, Lsn>, Lsn), WalError> {
806 let mut dirty_pages: HashMap<PageId, Lsn> = HashMap::new();
807 let mut active_txns: HashMap<TxnId, Lsn> = HashMap::new();
808 let mut last_checkpoint = 0;
809
810 for record in self.iter_records()? {
811 let record = record?;
812 let lsn = record.header.lsn;
813 let txn_id = record.header.txn_id;
814
815 match WalRecordType::try_from(record.header.record_type) {
816 Ok(WalRecordType::Begin) => {
817 active_txns.insert(txn_id, lsn);
818 }
819 Ok(WalRecordType::Update) => {
820 let page_id = record.header.page_id;
822 dirty_pages.entry(page_id).or_insert(lsn);
823 active_txns.insert(txn_id, lsn);
825 }
826 Ok(WalRecordType::Commit) | Ok(WalRecordType::Abort) | Ok(WalRecordType::End) => {
827 active_txns.remove(&txn_id);
828 }
829 Ok(WalRecordType::Clr) => {
830 active_txns.insert(txn_id, lsn);
831 }
832 Ok(WalRecordType::Checkpoint) => {
833 last_checkpoint = lsn;
834 }
835 Err(_) => {}
836 }
837 }
838
839 Ok((dirty_pages, active_txns, last_checkpoint))
840 }
841
842 fn redo_pass<R: RecoveryHandler>(
844 &self,
845 start_lsn: Lsn,
846 handler: &mut R,
847 ) -> Result<u64, WalError> {
848 let mut count = 0;
849
850 for record in self.iter_records_from(start_lsn)? {
851 let record = record?;
852
853 match WalRecordType::try_from(record.header.record_type) {
854 Ok(WalRecordType::Update) => {
855 handler.redo(&record)?;
856 count += 1;
857 }
858 Ok(WalRecordType::Clr) => {
859 count += 1;
861 }
862 _ => {}
863 }
864 }
865
866 Ok(count)
867 }
868
869 fn undo_pass<R: RecoveryHandler>(
871 &self,
872 active_txns: &HashMap<TxnId, Lsn>,
873 handler: &mut R,
874 ) -> Result<u64, WalError> {
875 let mut count = 0;
876
877 let mut undo_list: VecDeque<(Lsn, TxnId)> = active_txns
879 .iter()
880 .map(|(&txn_id, &lsn)| (lsn, txn_id))
881 .collect();
882 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
883
884 while let Some((lsn, txn_id)) = undo_list.pop_front() {
885 if lsn == 0 {
886 continue;
887 }
888
889 if let Some(record) = self.read_record_at(lsn)? {
891 match WalRecordType::try_from(record.header.record_type) {
892 Ok(WalRecordType::Update) => {
893 handler.undo(&record)?;
895 count += 1;
896
897 let clr_lsn = self.next_lsn();
899 let clr = WalRecord::clr(
900 clr_lsn,
901 txn_id,
902 lsn,
903 record.header.page_id,
904 record.header.offset,
905 record.header.prev_lsn,
906 );
907 self.append(clr)?;
908
909 if record.header.prev_lsn > 0 {
911 undo_list.push_back((record.header.prev_lsn, txn_id));
912 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
913 }
914 }
915 Ok(WalRecordType::Clr) => {
916 if record.after_image.len() >= 8 {
918 let undo_next =
919 u64::from_le_bytes(record.after_image[0..8].try_into().unwrap());
920 if undo_next > 0 {
921 undo_list.push_back((undo_next, txn_id));
922 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
923 }
924 }
925 }
926 _ => {
927 if record.header.prev_lsn > 0 {
929 undo_list.push_back((record.header.prev_lsn, txn_id));
930 undo_list.make_contiguous().sort_by(|a, b| b.0.cmp(&a.0));
931 }
932 }
933 }
934 }
935 }
936
937 self.force_flush()?;
938 Ok(count)
939 }
940
941 fn read_record_at(&self, target_lsn: Lsn) -> Result<Option<WalRecord>, WalError> {
943 for record in self.iter_records()? {
944 let record = record?;
945 if record.header.lsn == target_lsn {
946 return Ok(Some(record));
947 }
948 if record.header.lsn > target_lsn {
949 break;
950 }
951 }
952 Ok(None)
953 }
954
955 fn iter_records(&self) -> Result<WalIterator, WalError> {
957 self.iter_records_from(0)
958 }
959
960 fn iter_records_from(&self, start_lsn: Lsn) -> Result<WalIterator, WalError> {
962 let file_path = self.dir.join(format!(
963 "wal_{:08}.log",
964 self.file_number.load(Ordering::Relaxed)
965 ));
966
967 let file = File::open(&file_path).map_err(|e| WalError::Io(e.to_string()))?;
968
969 Ok(WalIterator {
970 file,
971 start_lsn,
972 started: false,
973 })
974 }
975}
976
977pub trait RecoveryHandler {
979 fn redo(&mut self, record: &WalRecord) -> Result<(), WalError>;
980 fn undo(&mut self, record: &WalRecord) -> Result<(), WalError>;
981}
982
983pub struct NoOpRecoveryHandler;
985
986impl RecoveryHandler for NoOpRecoveryHandler {
987 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
988 Ok(())
989 }
990 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
991 Ok(())
992 }
993}
994
995#[derive(Debug, Clone)]
997pub struct RecoveryStats {
998 pub analysis_time: Duration,
999 pub redo_records: u64,
1000 pub undo_records: u64,
1001 pub dirty_pages: usize,
1002 pub active_txns: usize,
1003}
1004
1005pub struct WalIterator {
1007 file: File,
1008 start_lsn: Lsn,
1009 started: bool,
1010}
1011
1012impl Iterator for WalIterator {
1013 type Item = Result<WalRecord, WalError>;
1014
1015 fn next(&mut self) -> Option<Self::Item> {
1016 let mut header_buf = [0u8; WalRecordHeader::SIZE];
1017
1018 match self.file.read_exact(&mut header_buf) {
1019 Ok(()) => {}
1020 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1021 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1022 }
1023
1024 let header = match WalRecordHeader::deserialize(&header_buf) {
1025 Some(h) => h,
1026 None => return Some(Err(WalError::InvalidRecord)),
1027 };
1028
1029 if !self.started {
1031 if header.lsn < self.start_lsn {
1032 let skip = header.data_length as i64 + 4;
1034 if let Err(e) = self.file.seek(SeekFrom::Current(skip)) {
1035 return Some(Err(WalError::Io(e.to_string())));
1036 }
1037 return self.next();
1038 }
1039 self.started = true;
1040 }
1041
1042 let data_len = header.data_length as usize;
1044 let mut data_buf = vec![0u8; data_len + 4]; match self.file.read_exact(&mut data_buf) {
1047 Ok(()) => {}
1048 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
1049 Err(e) => return Some(Err(WalError::Io(e.to_string()))),
1050 }
1051
1052 let mut full_buf = Vec::with_capacity(WalRecordHeader::SIZE + data_len);
1054 full_buf.extend_from_slice(&header_buf);
1055 full_buf.extend_from_slice(&data_buf[..data_len]);
1056
1057 let expected_crc = u32::from_le_bytes(data_buf[data_len..data_len + 4].try_into().unwrap());
1058 let actual_crc = crc32_of(&full_buf);
1059
1060 if expected_crc != actual_crc {
1061 return Some(Err(WalError::Corruption("CRC mismatch".to_string())));
1062 }
1063
1064 let before_end = header.before_length as usize;
1065 Some(Ok(WalRecord {
1066 header,
1067 before_image: data_buf[..before_end].to_vec(),
1068 after_image: data_buf[before_end..data_len].to_vec(),
1069 }))
1070 }
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075 use super::*;
1076 use std::sync::atomic::AtomicU64;
1077 use tempfile::TempDir;
1078 use sochdb_core::ValidityBitmap;
1079
1080 #[test]
1081 fn test_wal_record_serialization() {
1082 let record = WalRecord::update(1, 100, 0, 1000, 0, vec![1, 2, 3, 4], vec![5, 6, 7, 8]);
1083
1084 let serialized = record.serialize();
1085 let deserialized = WalRecord::deserialize(&serialized).unwrap();
1086
1087 let lsn = deserialized.header.lsn;
1089 let txn_id = deserialized.header.txn_id;
1090
1091 assert_eq!(lsn, 1);
1092 assert_eq!(txn_id, 100);
1093 assert_eq!(deserialized.before_image, vec![1, 2, 3, 4]);
1094 assert_eq!(deserialized.after_image, vec![5, 6, 7, 8]);
1095 }
1096
1097 #[test]
1098 #[ignore] fn test_wal_basic_operations() {
1100 let dir = TempDir::new().unwrap();
1101 let config = WalConfig {
1102 sync_mode: SyncMode::None, ..Default::default()
1104 };
1105
1106 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1107
1108 let begin_lsn = wal.begin_txn(1).unwrap();
1110 assert!(begin_lsn > 0);
1111
1112 let update_lsn = wal.log_update(1, 100, 0, vec![0; 10], vec![1; 10]).unwrap();
1114 assert!(update_lsn > begin_lsn);
1115
1116 let commit_lsn = wal.commit_txn(1).unwrap();
1118 assert!(commit_lsn > update_lsn);
1119
1120 assert!(wal.stats().records_written.load(Ordering::Relaxed) >= 3);
1122 }
1123
1124 #[test]
1125 #[ignore] fn test_wal_group_commit() {
1127 let dir = TempDir::new().unwrap();
1128 let config = WalConfig {
1129 sync_mode: SyncMode::None,
1130 buffer_size: 10000, max_batch_size: 100,
1132 flush_interval: Duration::from_secs(10), ..Default::default()
1134 };
1135
1136 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1137
1138 for i in 0..10 {
1140 wal.begin_txn(i).unwrap();
1141 wal.log_update(i, 100 + i, 0, vec![0; 10], vec![1; 10])
1142 .unwrap();
1143 }
1144
1145 wal.force_flush().unwrap();
1147
1148 let stats = wal.stats();
1149 let flushes = stats.flushes.load(Ordering::Relaxed);
1150 let records = stats.records_written.load(Ordering::Relaxed);
1151
1152 assert!(records >= 20); println!(
1155 "Flushes: {}, Records: {}, Avg batch: {:.1}",
1156 flushes,
1157 records,
1158 stats.avg_batch_size()
1159 );
1160 }
1161
1162 #[test]
1163 fn test_crc32() {
1164 let data = b"hello world";
1165 let crc = crc32_of(data);
1166 assert_ne!(crc, 0);
1167
1168 let crc2 = crc32_of(data);
1170 assert_eq!(crc, crc2);
1171
1172 let data2 = b"hello World"; let crc3 = crc32_of(data2);
1175 assert_ne!(crc, crc3);
1176 }
1177
1178 #[test]
1179 #[ignore] fn test_wal_iterator() {
1181 let dir = TempDir::new().unwrap();
1182 let config = WalConfig {
1183 sync_mode: SyncMode::None,
1184 ..Default::default()
1185 };
1186
1187 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1188
1189 wal.begin_txn(1).unwrap();
1191 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1192 .unwrap();
1193 wal.log_update(1, 101, 0, vec![7, 8, 9], vec![10, 11, 12])
1194 .unwrap();
1195 wal.force_flush().unwrap();
1196
1197 let count = wal.iter_records().unwrap().count();
1199 assert_eq!(count, 3); }
1201
1202 #[test]
1203 #[ignore] fn test_wal_persistence() {
1205 let dir = TempDir::new().unwrap();
1206 let config = WalConfig {
1207 sync_mode: SyncMode::None,
1208 ..Default::default()
1209 };
1210
1211 {
1213 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1214 wal.begin_txn(1).unwrap();
1215 wal.log_update(1, 100, 0, vec![1, 2, 3], vec![4, 5, 6])
1216 .unwrap();
1217 wal.commit_txn(1).unwrap();
1218 }
1219
1220 {
1222 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1223 let count = wal.iter_records().unwrap().count();
1224 assert_eq!(count, 3); }
1226 }
1227
1228 #[test]
1229 #[ignore] fn test_wal_recovery_analysis() {
1231 let dir = TempDir::new().unwrap();
1232 let config = WalConfig {
1233 sync_mode: SyncMode::None,
1234 ..Default::default()
1235 };
1236
1237 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1238
1239 wal.begin_txn(1).unwrap();
1241 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1242 wal.commit_txn(1).unwrap();
1243
1244 wal.begin_txn(2).unwrap();
1246 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1247 wal.force_flush().unwrap();
1248
1249 let (dirty_pages, active_txns, _) = wal.analysis_pass().unwrap();
1251
1252 assert!(!active_txns.contains_key(&1)); assert!(active_txns.contains_key(&2)); assert!(dirty_pages.contains_key(&200)); }
1256
1257 struct TestRecoveryHandler {
1258 redo_count: AtomicU64,
1259 undo_count: AtomicU64,
1260 }
1261
1262 impl RecoveryHandler for TestRecoveryHandler {
1263 fn redo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1264 self.redo_count.fetch_add(1, Ordering::Relaxed);
1265 Ok(())
1266 }
1267 fn undo(&mut self, _record: &WalRecord) -> Result<(), WalError> {
1268 self.undo_count.fetch_add(1, Ordering::Relaxed);
1269 Ok(())
1270 }
1271 }
1272
1273 #[test]
1274 #[ignore] fn test_wal_full_recovery() {
1276 let dir = TempDir::new().unwrap();
1277 let config = WalConfig {
1278 sync_mode: SyncMode::None,
1279 ..Default::default()
1280 };
1281
1282 {
1284 let wal = WriteAheadLog::open(dir.path(), config.clone()).unwrap();
1285
1286 wal.begin_txn(1).unwrap();
1288 wal.log_update(1, 100, 0, vec![1, 2], vec![3, 4]).unwrap();
1289 wal.commit_txn(1).unwrap();
1290
1291 wal.begin_txn(2).unwrap();
1293 wal.log_update(2, 200, 0, vec![5, 6], vec![7, 8]).unwrap();
1294 wal.log_update(2, 201, 0, vec![9, 10], vec![11, 12])
1295 .unwrap();
1296 wal.force_flush().unwrap();
1297 }
1299
1300 {
1302 let wal = WriteAheadLog::open(dir.path(), config).unwrap();
1303 let mut handler = TestRecoveryHandler {
1304 redo_count: AtomicU64::new(0),
1305 undo_count: AtomicU64::new(0),
1306 };
1307
1308 let stats = wal.recover(&mut handler).unwrap();
1309
1310 assert_eq!(stats.redo_records, 3);
1312
1313 assert_eq!(stats.undo_records, 2);
1315
1316 assert_eq!(stats.active_txns, 1);
1318 }
1319 }
1320
1321 #[test]
1322 #[ignore]
1323 fn test_validity_bitmap() {
1324 let mut bitmap = ValidityBitmap::new_all_valid(100);
1325 assert_eq!(bitmap.len(), 100);
1326 assert_eq!(bitmap.null_count(), 0);
1327
1328 for i in 0..100 {
1329 assert!(bitmap.is_valid(i));
1330 }
1331
1332 bitmap.set_null(10);
1334 bitmap.set_null(50);
1335 bitmap.set_null(99);
1336
1337 assert_eq!(bitmap.null_count(), 3);
1338 assert!(!bitmap.is_valid(10));
1339 assert!(!bitmap.is_valid(50));
1340 assert!(!bitmap.is_valid(99));
1341 assert!(bitmap.is_valid(11));
1342 }
1343}