1use bytes::{Buf, BufMut};
13use featherdb_core::{Error, Lsn, Result, TransactionId, WalGroupCommitConfig, WalSyncMode};
14use parking_lot::{Condvar, Mutex, RwLock};
15use std::collections::VecDeque;
16use std::fs::{File, OpenOptions};
17use std::io::{BufReader, Read, Seek, SeekFrom, Write};
18use std::path::Path;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use std::thread::{self, JoinHandle};
22use std::time::{Duration, Instant};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26#[repr(u8)]
27pub enum WalRecordType {
28 Begin = 0,
30 Commit = 1,
32 Abort = 2,
34 Write = 3,
36 Checkpoint = 4,
38}
39
40impl TryFrom<u8> for WalRecordType {
41 type Error = Error;
42
43 fn try_from(value: u8) -> Result<Self> {
44 match value {
45 0 => Ok(WalRecordType::Begin),
46 1 => Ok(WalRecordType::Commit),
47 2 => Ok(WalRecordType::Abort),
48 3 => Ok(WalRecordType::Write),
49 4 => Ok(WalRecordType::Checkpoint),
50 _ => Err(Error::CorruptedWal {
51 message: format!("Invalid record type: {}", value),
52 }),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct WalRecord {
60 pub lsn: Lsn,
62 pub txn_id: TransactionId,
64 pub record_type: WalRecordType,
66 pub prev_lsn: Lsn,
68 pub page_id: u64,
70 pub old_data: Vec<u8>,
72 pub new_data: Vec<u8>,
74}
75
76impl WalRecord {
77 const HEADER_SIZE: usize = 45;
79
80 pub fn begin(lsn: Lsn, txn_id: TransactionId) -> Self {
82 WalRecord {
83 lsn,
84 txn_id,
85 record_type: WalRecordType::Begin,
86 prev_lsn: Lsn::ZERO,
87 page_id: 0,
88 old_data: vec![],
89 new_data: vec![],
90 }
91 }
92
93 pub fn commit(lsn: Lsn, txn_id: TransactionId, prev_lsn: Lsn) -> Self {
95 WalRecord {
96 lsn,
97 txn_id,
98 record_type: WalRecordType::Commit,
99 prev_lsn,
100 page_id: 0,
101 old_data: vec![],
102 new_data: vec![],
103 }
104 }
105
106 pub fn abort(lsn: Lsn, txn_id: TransactionId, prev_lsn: Lsn) -> Self {
108 WalRecord {
109 lsn,
110 txn_id,
111 record_type: WalRecordType::Abort,
112 prev_lsn,
113 page_id: 0,
114 old_data: vec![],
115 new_data: vec![],
116 }
117 }
118
119 pub fn write(
121 lsn: Lsn,
122 txn_id: TransactionId,
123 prev_lsn: Lsn,
124 page_id: u64,
125 old_data: Vec<u8>,
126 new_data: Vec<u8>,
127 ) -> Self {
128 WalRecord {
129 lsn,
130 txn_id,
131 record_type: WalRecordType::Write,
132 prev_lsn,
133 page_id,
134 old_data,
135 new_data,
136 }
137 }
138
139 pub fn checkpoint(lsn: Lsn) -> Self {
141 WalRecord {
142 lsn,
143 txn_id: TransactionId::NONE,
144 record_type: WalRecordType::Checkpoint,
145 prev_lsn: Lsn::ZERO,
146 page_id: 0,
147 old_data: vec![],
148 new_data: vec![],
149 }
150 }
151
152 pub fn serialize(&self) -> Vec<u8> {
154 let payload_size = self.old_data.len() + self.new_data.len();
155 let mut buf = Vec::with_capacity(Self::HEADER_SIZE + payload_size);
156
157 buf.put_u64_le(self.lsn.0);
158 buf.put_u64_le(self.txn_id.0);
159 buf.put_u8(self.record_type as u8);
160 buf.put_u64_le(self.prev_lsn.0);
161 buf.put_u64_le(self.page_id);
162 buf.put_u32_le(self.old_data.len() as u32);
163 buf.put_u32_le(self.new_data.len() as u32);
164 buf.extend_from_slice(&self.old_data);
165 buf.extend_from_slice(&self.new_data);
166
167 let checksum = crc32(&buf);
169 buf.put_u32_le(checksum);
170
171 buf
172 }
173
174 pub fn deserialize(data: &[u8]) -> Result<Self> {
176 if data.len() < Self::HEADER_SIZE {
177 return Err(Error::CorruptedWal {
178 message: "Record too small".into(),
179 });
180 }
181
182 let mut cursor = data;
183
184 let lsn = Lsn(cursor.get_u64_le());
185 let txn_id = TransactionId(cursor.get_u64_le());
186 let record_type = WalRecordType::try_from(cursor.get_u8())?;
187 let prev_lsn = Lsn(cursor.get_u64_le());
188 let page_id = cursor.get_u64_le();
189 let old_len = cursor.get_u32_le() as usize;
190 let new_len = cursor.get_u32_le() as usize;
191
192 if cursor.remaining() < old_len + new_len + 4 {
193 return Err(Error::CorruptedWal {
194 message: "Record data truncated".into(),
195 });
196 }
197
198 let mut old_data = vec![0u8; old_len];
199 cursor.copy_to_slice(&mut old_data);
200
201 let mut new_data = vec![0u8; new_len];
202 cursor.copy_to_slice(&mut new_data);
203
204 let stored_checksum = cursor.get_u32_le();
205
206 let data_without_checksum = &data[..data.len() - 4];
208 let computed_checksum = crc32(data_without_checksum);
209
210 if stored_checksum != computed_checksum {
211 return Err(Error::CorruptedWal {
212 message: format!(
213 "Checksum mismatch: stored {}, computed {}",
214 stored_checksum, computed_checksum
215 ),
216 });
217 }
218
219 Ok(WalRecord {
220 lsn,
221 txn_id,
222 record_type,
223 prev_lsn,
224 page_id,
225 old_data,
226 new_data,
227 })
228 }
229
230 pub fn serialized_size(&self) -> usize {
232 Self::HEADER_SIZE + self.old_data.len() + self.new_data.len()
233 }
234}
235
236fn crc32(data: &[u8]) -> u32 {
238 let mut crc: u32 = 0xFFFFFFFF;
239 for byte in data {
240 crc ^= *byte as u32;
241 for _ in 0..8 {
242 if crc & 1 != 0 {
243 crc = (crc >> 1) ^ 0xEDB88320;
244 } else {
245 crc >>= 1;
246 }
247 }
248 }
249 !crc
250}
251
252#[derive(Debug, Default)]
254pub struct WalStats {
255 records_buffered: AtomicU64,
257 group_commits: AtomicU64,
259 group_commit_records: AtomicU64,
261 fsync_count: AtomicU64,
263 total_records: AtomicU64,
265}
266
267impl WalStats {
268 pub fn new() -> Self {
270 Self::default()
271 }
272
273 pub fn records_buffered(&self) -> u64 {
275 self.records_buffered.load(Ordering::Relaxed)
276 }
277
278 pub fn group_commits(&self) -> u64 {
280 self.group_commits.load(Ordering::Relaxed)
281 }
282
283 pub fn average_batch_size(&self) -> f64 {
285 let commits = self.group_commits.load(Ordering::Relaxed);
286 let records = self.group_commit_records.load(Ordering::Relaxed);
287 if commits == 0 {
288 0.0
289 } else {
290 records as f64 / commits as f64
291 }
292 }
293
294 pub fn fsync_count(&self) -> u64 {
296 self.fsync_count.load(Ordering::Relaxed)
297 }
298
299 pub fn total_records(&self) -> u64 {
301 self.total_records.load(Ordering::Relaxed)
302 }
303
304 pub fn snapshot(&self) -> WalStatsSnapshot {
306 WalStatsSnapshot {
307 records_buffered: self.records_buffered.load(Ordering::Relaxed),
308 group_commits: self.group_commits.load(Ordering::Relaxed),
309 group_commit_records: self.group_commit_records.load(Ordering::Relaxed),
310 fsync_count: self.fsync_count.load(Ordering::Relaxed),
311 total_records: self.total_records.load(Ordering::Relaxed),
312 }
313 }
314
315 fn record_buffered(&self) {
317 self.records_buffered.fetch_add(1, Ordering::Relaxed);
318 }
319
320 fn records_flushed(&self, count: u64) {
322 self.records_buffered.fetch_sub(count, Ordering::Relaxed);
323 }
324
325 fn record_group_commit(&self, batch_size: u64) {
327 self.group_commits.fetch_add(1, Ordering::Relaxed);
328 self.group_commit_records
329 .fetch_add(batch_size, Ordering::Relaxed);
330 }
331
332 fn record_fsync(&self) {
334 self.fsync_count.fetch_add(1, Ordering::Relaxed);
335 }
336
337 fn record_written(&self) {
339 self.total_records.fetch_add(1, Ordering::Relaxed);
340 }
341}
342
343#[derive(Debug, Clone, Copy)]
345pub struct WalStatsSnapshot {
346 pub records_buffered: u64,
347 pub group_commits: u64,
348 pub group_commit_records: u64,
349 pub fsync_count: u64,
350 pub total_records: u64,
351}
352
353impl WalStatsSnapshot {
354 pub fn average_batch_size(&self) -> f64 {
356 if self.group_commits == 0 {
357 0.0
358 } else {
359 self.group_commit_records as f64 / self.group_commits as f64
360 }
361 }
362}
363
364struct PendingCommit {
366 lsn: Lsn,
368 synced: bool,
370}
371
372struct WalBuffer {
374 records: VecDeque<WalRecord>,
376 pending_commits: VecDeque<PendingCommit>,
378 synced_lsn: Lsn,
380}
381
382impl WalBuffer {
383 fn new() -> Self {
384 WalBuffer {
385 records: VecDeque::new(),
386 pending_commits: VecDeque::new(),
387 synced_lsn: Lsn::ZERO,
388 }
389 }
390
391 fn is_empty(&self) -> bool {
392 self.records.is_empty()
393 }
394
395 fn len(&self) -> usize {
396 self.records.len()
397 }
398}
399
400struct FlushWorker {
402 buffer: Arc<Mutex<WalBuffer>>,
404 flush_cond: Arc<Condvar>,
406 sync_cond: Arc<Condvar>,
408 file: Arc<Mutex<File>>,
410 config: WalGroupCommitConfig,
412 stats: Arc<WalStats>,
414 shutdown: Arc<AtomicBool>,
416}
417
418impl FlushWorker {
419 fn run(&self) {
420 let interval = Duration::from_millis(self.config.group_commit_interval_ms);
421 let mut last_flush = Instant::now();
422
423 loop {
424 if self.shutdown.load(Ordering::Relaxed) {
426 self.flush_buffer();
428 break;
429 }
430
431 {
433 let mut buffer = self.buffer.lock();
434 let should_flush = !buffer.is_empty()
435 && (buffer.len() >= self.config.group_commit_max_batch
436 || last_flush.elapsed() >= interval);
437
438 if buffer.is_empty() {
439 self.flush_cond.wait_for(&mut buffer, interval);
441 } else if !should_flush {
442 let remaining = interval.saturating_sub(last_flush.elapsed());
444 if !remaining.is_zero() {
445 self.flush_cond.wait_for(&mut buffer, remaining);
446 }
447 }
448 }
449
450 let flushed = self.flush_buffer();
452 if flushed {
453 last_flush = Instant::now();
454 }
455 }
456 }
457
458 fn flush_buffer(&self) -> bool {
459 let mut buffer = self.buffer.lock();
460
461 if buffer.is_empty() {
462 return false;
463 }
464
465 let records: Vec<_> = buffer.records.drain(..).collect();
467 let record_count = records.len() as u64;
468
469 self.stats.records_flushed(record_count);
471
472 let mut data = Vec::new();
474 let mut max_lsn = Lsn::ZERO;
475
476 for record in &records {
477 let serialized = record.serialize();
478 let len = serialized.len() as u32;
479 data.extend_from_slice(&len.to_le_bytes());
480 data.extend_from_slice(&serialized);
481 if record.lsn > max_lsn {
482 max_lsn = record.lsn;
483 }
484 }
485
486 drop(buffer);
488
489 {
491 let mut file = self.file.lock();
492 if let Err(e) = file.seek(SeekFrom::End(0)) {
493 eprintln!("WAL seek error: {}", e);
494 return false;
495 }
496 if let Err(e) = file.write_all(&data) {
497 eprintln!("WAL write error: {}", e);
498 return false;
499 }
500 if let Err(e) = file.sync_all() {
502 eprintln!("WAL sync error: {}", e);
503 return false;
504 }
505 }
506
507 self.stats.record_fsync();
509 self.stats.record_group_commit(record_count);
510
511 {
513 let mut buffer = self.buffer.lock();
514 buffer.synced_lsn = max_lsn;
515
516 for pending in buffer.pending_commits.iter_mut() {
518 if pending.lsn <= max_lsn {
519 pending.synced = true;
520 }
521 }
522 }
523
524 self.sync_cond.notify_all();
526
527 true
528 }
529}
530
531pub struct Wal {
533 file: Arc<Mutex<File>>,
535 current_lsn: RwLock<Lsn>,
537 #[allow(dead_code)]
539 path: std::path::PathBuf,
540 config: WalGroupCommitConfig,
542 buffer: Arc<Mutex<WalBuffer>>,
544 flush_cond: Arc<Condvar>,
546 sync_cond: Arc<Condvar>,
548 flush_thread: Option<JoinHandle<()>>,
550 shutdown: Arc<AtomicBool>,
552 stats: Arc<WalStats>,
554 max_size: Option<u64>,
556 current_size: AtomicU64,
558}
559
560impl Wal {
561 pub fn open(path: &Path) -> Result<Self> {
563 Self::open_with_config(path, WalGroupCommitConfig::default(), None)
564 }
565
566 pub fn open_with_config(
568 path: &Path,
569 config: WalGroupCommitConfig,
570 max_size: Option<u64>,
571 ) -> Result<Self> {
572 let file = OpenOptions::new()
573 .read(true)
574 .create(true)
575 .append(true)
576 .open(path)?;
577
578 let current_lsn = Self::find_last_lsn(&file)?;
580
581 let current_size = file.metadata()?.len();
583
584 let file = Arc::new(Mutex::new(file));
585 let buffer = Arc::new(Mutex::new(WalBuffer::new()));
586 let flush_cond = Arc::new(Condvar::new());
587 let sync_cond = Arc::new(Condvar::new());
588 let shutdown = Arc::new(AtomicBool::new(false));
589 let stats = Arc::new(WalStats::new());
590
591 let flush_thread = if config.sync_mode == WalSyncMode::GroupCommit {
593 let worker = FlushWorker {
594 buffer: buffer.clone(),
595 flush_cond: flush_cond.clone(),
596 sync_cond: sync_cond.clone(),
597 file: file.clone(),
598 config: config.clone(),
599 stats: stats.clone(),
600 shutdown: shutdown.clone(),
601 };
602
603 Some(thread::spawn(move || {
604 worker.run();
605 }))
606 } else {
607 None
608 };
609
610 Ok(Wal {
611 file,
612 current_lsn: RwLock::new(current_lsn),
613 path: path.to_path_buf(),
614 config,
615 buffer,
616 flush_cond,
617 sync_cond,
618 flush_thread,
619 shutdown,
620 stats,
621 max_size,
622 current_size: AtomicU64::new(current_size),
623 })
624 }
625
626 fn find_last_lsn(file: &File) -> Result<Lsn> {
628 let mut reader = BufReader::new(file);
629 let file_len = reader.seek(SeekFrom::End(0))?;
630
631 if file_len == 0 {
632 return Ok(Lsn(1)); }
634
635 reader.seek(SeekFrom::Start(0))?;
636
637 let mut last_lsn = Lsn(0);
638 let mut buf = Vec::new();
639
640 loop {
641 let mut len_buf = [0u8; 4];
643 match reader.read_exact(&mut len_buf) {
644 Ok(_) => {}
645 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
646 Err(e) => return Err(e.into()),
647 }
648
649 let record_len = u32::from_le_bytes(len_buf) as usize;
650
651 buf.resize(record_len, 0);
653 match reader.read_exact(&mut buf) {
654 Ok(_) => {}
655 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
656 Err(e) => return Err(e.into()),
657 }
658
659 if let Ok(record) = WalRecord::deserialize(&buf) {
661 last_lsn = record.lsn;
662 }
663 }
664
665 Ok(Lsn(last_lsn.0 + 1))
666 }
667
668 pub fn current_lsn(&self) -> Lsn {
670 *self.current_lsn.read()
671 }
672
673 pub fn needs_recovery(&self) -> bool {
675 self.current_size.load(Ordering::Relaxed) > 0
676 }
677
678 pub fn stats(&self) -> &Arc<WalStats> {
680 &self.stats
681 }
682
683 pub fn sync_mode(&self) -> WalSyncMode {
685 self.config.sync_mode
686 }
687
688 pub fn current_size(&self) -> u64 {
690 self.current_size.load(Ordering::Relaxed)
691 }
692
693 pub fn max_size(&self) -> Option<u64> {
695 self.max_size
696 }
697
698 fn check_wal_limit(&self, write_size: u64) -> Result<()> {
700 if let Some(limit) = self.max_size {
701 let current = self.current_size.load(Ordering::Relaxed);
702 if current.saturating_add(write_size) > limit {
703 return Err(Error::WalLimitExceeded {
704 current_bytes: current,
705 limit_bytes: limit,
706 });
707 }
708 }
709 Ok(())
710 }
711
712 fn append_internal(&self, mut record: WalRecord) -> Result<Lsn> {
714 let mut current_lsn = self.current_lsn.write();
715 record.lsn = *current_lsn;
716 let lsn = *current_lsn;
717 *current_lsn = current_lsn.next();
718
719 let serialized = record.serialize();
721 let write_size = (4 + serialized.len()) as u64; self.check_wal_limit(write_size)?;
725
726 self.stats.record_written();
727
728 match self.config.sync_mode {
729 WalSyncMode::Immediate => {
730 let len = serialized.len() as u32;
732
733 let mut file = self.file.lock();
734 file.seek(SeekFrom::End(0))?;
735 file.write_all(&len.to_le_bytes())?;
736 file.write_all(&serialized)?;
737
738 self.current_size.fetch_add(write_size, Ordering::Relaxed);
740 }
741 WalSyncMode::GroupCommit => {
742 let mut buffer = self.buffer.lock();
744 buffer.records.push_back(record);
745 self.stats.record_buffered();
746
747 self.current_size.fetch_add(write_size, Ordering::Relaxed);
749
750 if buffer.len() >= self.config.group_commit_max_batch {
752 self.flush_cond.notify_one();
753 }
754 }
755 WalSyncMode::NoSync => {
756 let len = serialized.len() as u32;
758
759 let mut file = self.file.lock();
760 file.seek(SeekFrom::End(0))?;
761 file.write_all(&len.to_le_bytes())?;
762 file.write_all(&serialized)?;
763
764 self.current_size.fetch_add(write_size, Ordering::Relaxed);
766 }
767 }
768
769 Ok(lsn)
770 }
771
772 pub fn append(&mut self, record: WalRecord) -> Result<Lsn> {
774 self.append_internal(record)
775 }
776
777 pub fn sync(&self) -> Result<()> {
779 match self.config.sync_mode {
780 WalSyncMode::Immediate | WalSyncMode::NoSync => {
781 self.file.lock().sync_all()?;
782 self.stats.record_fsync();
783 }
784 WalSyncMode::GroupCommit => {
785 self.flush_cond.notify_one();
787
788 let current_lsn = self.current_lsn();
790 let mut buffer = self.buffer.lock();
791 while buffer.synced_lsn < current_lsn && !buffer.is_empty() {
792 self.sync_cond.wait(&mut buffer);
793 }
794 }
795 }
796 Ok(())
797 }
798
799 pub fn flush_group(&self) -> Result<Lsn> {
802 if self.config.sync_mode != WalSyncMode::GroupCommit {
803 self.sync()?;
805 return Ok(self.current_lsn());
806 }
807
808 self.flush_cond.notify_one();
810
811 let target_lsn = {
813 let current_lsn = self.current_lsn.read();
814 Lsn(current_lsn.0.saturating_sub(1)) };
816
817 let mut buffer = self.buffer.lock();
818 while buffer.synced_lsn < target_lsn {
819 self.sync_cond.wait(&mut buffer);
820 }
821
822 Ok(buffer.synced_lsn)
823 }
824
825 pub fn truncate(&mut self) -> Result<()> {
827 if self.config.sync_mode == WalSyncMode::GroupCommit {
829 self.flush_group()?;
830 }
831
832 let file = self.file.lock();
833 file.set_len(0)?;
834 file.sync_all()?;
835 *self.current_lsn.write() = Lsn(1);
836
837 self.current_size.store(0, Ordering::Relaxed);
839
840 let mut buffer = self.buffer.lock();
842 buffer.records.clear();
843 buffer.pending_commits.clear();
844 buffer.synced_lsn = Lsn::ZERO;
845
846 Ok(())
847 }
848
849 pub fn iter(&self) -> Result<WalIterator> {
851 let file = self.file.lock();
852 let mut reader = BufReader::new(file.try_clone()?);
853 reader.seek(SeekFrom::Start(0))?;
854
855 Ok(WalIterator { reader })
856 }
857
858 pub fn log_begin(&mut self, txn_id: TransactionId) -> Result<Lsn> {
860 let record = WalRecord::begin(Lsn::ZERO, txn_id);
861 self.append(record)
862 }
863
864 pub fn log_commit(&mut self, txn_id: TransactionId, prev_lsn: Lsn) -> Result<Lsn> {
869 let record = WalRecord::commit(Lsn::ZERO, txn_id, prev_lsn);
870 let lsn = self.append(record)?;
871
872 match self.config.sync_mode {
873 WalSyncMode::Immediate => {
874 self.file.lock().sync_all()?;
875 self.stats.record_fsync();
876 }
877 WalSyncMode::GroupCommit => {
878 {
880 let mut buffer = self.buffer.lock();
881 buffer
882 .pending_commits
883 .push_back(PendingCommit { lsn, synced: false });
884 }
885
886 self.flush_cond.notify_one();
888
889 let mut buffer = self.buffer.lock();
891 loop {
892 if buffer.synced_lsn >= lsn {
894 buffer.pending_commits.retain(|p| p.lsn != lsn);
896 break;
897 }
898 self.sync_cond.wait(&mut buffer);
899 }
900 }
901 WalSyncMode::NoSync => {
902 }
904 }
905
906 Ok(lsn)
907 }
908
909 pub fn log_abort(&mut self, txn_id: TransactionId, prev_lsn: Lsn) -> Result<Lsn> {
911 let record = WalRecord::abort(Lsn::ZERO, txn_id, prev_lsn);
912 self.append(record)
913 }
914
915 pub fn log_write(
917 &mut self,
918 txn_id: TransactionId,
919 prev_lsn: Lsn,
920 page_id: u64,
921 old_data: Vec<u8>,
922 new_data: Vec<u8>,
923 ) -> Result<Lsn> {
924 let record = WalRecord::write(Lsn::ZERO, txn_id, prev_lsn, page_id, old_data, new_data);
925 self.append(record)
926 }
927
928 pub fn log_checkpoint(&mut self) -> Result<Lsn> {
930 let record = WalRecord::checkpoint(Lsn::ZERO);
931 let lsn = self.append(record)?;
932
933 match self.config.sync_mode {
935 WalSyncMode::GroupCommit => {
936 self.flush_group()?;
937 }
938 _ => {
939 self.file.lock().sync_all()?;
940 self.stats.record_fsync();
941 }
942 }
943
944 Ok(lsn)
945 }
946
947 pub fn shutdown(&mut self) -> Result<()> {
949 self.shutdown.store(true, Ordering::Relaxed);
951
952 self.flush_cond.notify_one();
954
955 if let Some(handle) = self.flush_thread.take() {
957 handle
958 .join()
959 .map_err(|_| Error::Internal("Flush thread panicked".into()))?;
960 }
961
962 self.file.lock().sync_all()?;
964
965 Ok(())
966 }
967}
968
969impl Drop for Wal {
970 fn drop(&mut self) {
971 let _ = self.shutdown();
973 }
974}
975
976pub struct WalIterator {
978 reader: BufReader<File>,
979}
980
981impl Iterator for WalIterator {
982 type Item = Result<WalRecord>;
983
984 fn next(&mut self) -> Option<Self::Item> {
985 let mut len_buf = [0u8; 4];
987 match self.reader.read_exact(&mut len_buf) {
988 Ok(_) => {}
989 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return None,
990 Err(e) => return Some(Err(e.into())),
991 }
992
993 let record_len = u32::from_le_bytes(len_buf) as usize;
994
995 let mut buf = vec![0u8; record_len];
997 match self.reader.read_exact(&mut buf) {
998 Ok(_) => {}
999 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return None,
1000 Err(e) => return Some(Err(e.into())),
1001 }
1002
1003 Some(WalRecord::deserialize(&buf))
1004 }
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009 use super::*;
1010 use tempfile::TempDir;
1011
1012 #[test]
1013 fn test_wal_record_serialization() {
1014 let record = WalRecord::write(
1015 Lsn(1),
1016 TransactionId(100),
1017 Lsn::ZERO,
1018 42,
1019 vec![1, 2, 3],
1020 vec![4, 5, 6],
1021 );
1022
1023 let serialized = record.serialize();
1024 let deserialized = WalRecord::deserialize(&serialized).unwrap();
1025
1026 assert_eq!(record.lsn, deserialized.lsn);
1027 assert_eq!(record.txn_id, deserialized.txn_id);
1028 assert_eq!(record.record_type, deserialized.record_type);
1029 assert_eq!(record.page_id, deserialized.page_id);
1030 assert_eq!(record.old_data, deserialized.old_data);
1031 assert_eq!(record.new_data, deserialized.new_data);
1032 }
1033
1034 #[test]
1035 fn test_wal_append_iterate() {
1036 let tmp = TempDir::new().unwrap();
1037 let wal_path = tmp.path().join("test.wal");
1038
1039 let mut wal = Wal::open(&wal_path).unwrap();
1040
1041 let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1043 let lsn2 = wal
1044 .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1045 .unwrap();
1046 let _lsn3 = wal.log_commit(TransactionId(1), lsn2).unwrap();
1047
1048 let records: Vec<_> = wal.iter().unwrap().collect();
1050 assert_eq!(records.len(), 3);
1051
1052 assert_eq!(
1053 records[0].as_ref().unwrap().record_type,
1054 WalRecordType::Begin
1055 );
1056 assert_eq!(
1057 records[1].as_ref().unwrap().record_type,
1058 WalRecordType::Write
1059 );
1060 assert_eq!(
1061 records[2].as_ref().unwrap().record_type,
1062 WalRecordType::Commit
1063 );
1064 }
1065
1066 #[test]
1067 fn test_wal_recovery() {
1068 let tmp = TempDir::new().unwrap();
1069 let wal_path = tmp.path().join("test.wal");
1070
1071 {
1073 let mut wal = Wal::open(&wal_path).unwrap();
1074 wal.log_begin(TransactionId(1)).unwrap();
1075 wal.sync().unwrap();
1076 }
1077
1078 {
1080 let wal = Wal::open(&wal_path).unwrap();
1081 let records: Vec<_> = wal.iter().unwrap().collect();
1082 assert_eq!(records.len(), 1);
1083 }
1084 }
1085
1086 #[test]
1087 fn test_wal_group_commit() {
1088 let tmp = TempDir::new().unwrap();
1089 let wal_path = tmp.path().join("test_group.wal");
1090
1091 let config = WalGroupCommitConfig {
1092 sync_mode: WalSyncMode::GroupCommit,
1093 group_commit_interval_ms: 50,
1094 group_commit_max_batch: 10,
1095 };
1096
1097 let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1098
1099 for i in 1..=5 {
1101 let txn_id = TransactionId(i);
1102 let lsn1 = wal.log_begin(txn_id).unwrap();
1103 let lsn2 = wal
1104 .log_write(txn_id, lsn1, i * 10, vec![0], vec![1])
1105 .unwrap();
1106 wal.log_commit(txn_id, lsn2).unwrap();
1107 }
1108
1109 let records: Vec<_> = wal.iter().unwrap().collect();
1111 assert_eq!(records.len(), 15); let stats = wal.stats();
1115 assert!(stats.group_commits() >= 1);
1116 assert_eq!(stats.total_records(), 15);
1117
1118 wal.shutdown().unwrap();
1120 }
1121
1122 #[test]
1123 fn test_wal_no_sync_mode() {
1124 let tmp = TempDir::new().unwrap();
1125 let wal_path = tmp.path().join("test_nosync.wal");
1126
1127 let config = WalGroupCommitConfig {
1128 sync_mode: WalSyncMode::NoSync,
1129 ..Default::default()
1130 };
1131
1132 let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1133
1134 let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1136 let lsn2 = wal
1137 .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1138 .unwrap();
1139 wal.log_commit(TransactionId(1), lsn2).unwrap();
1140
1141 let stats = wal.stats();
1143 assert_eq!(stats.fsync_count(), 0);
1144
1145 wal.sync().unwrap();
1147 assert_eq!(stats.fsync_count(), 1);
1148 }
1149
1150 #[test]
1151 fn test_wal_stats() {
1152 let tmp = TempDir::new().unwrap();
1153 let wal_path = tmp.path().join("test_stats.wal");
1154
1155 let mut wal = Wal::open(&wal_path).unwrap();
1156
1157 let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1158 let lsn2 = wal
1159 .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1160 .unwrap();
1161 wal.log_commit(TransactionId(1), lsn2).unwrap();
1162
1163 let stats = wal.stats();
1164 assert_eq!(stats.total_records(), 3);
1165 assert!(stats.fsync_count() >= 1);
1167 }
1168
1169 #[test]
1170 fn test_wal_batch_flush() {
1171 let tmp = TempDir::new().unwrap();
1172 let wal_path = tmp.path().join("test_batch.wal");
1173
1174 let config = WalGroupCommitConfig {
1175 sync_mode: WalSyncMode::GroupCommit,
1176 group_commit_interval_ms: 1000, group_commit_max_batch: 5, };
1179
1180 let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1181
1182 for i in 1..=3 {
1184 let txn_id = TransactionId(i);
1185 let lsn1 = wal.log_begin(txn_id).unwrap();
1186 wal.log_commit(txn_id, lsn1).unwrap();
1187 }
1188
1189 let records: Vec<_> = wal.iter().unwrap().collect();
1191 assert_eq!(records.len(), 6); wal.shutdown().unwrap();
1194 }
1195
1196 #[test]
1197 fn test_wal_size_limit() {
1198 let tmp = TempDir::new().unwrap();
1199 let wal_path = tmp.path().join("test_limited.wal");
1200
1201 let max_size = Some(10 * 1024);
1203 let mut wal =
1204 Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), max_size).unwrap();
1205
1206 let mut count = 0;
1208 loop {
1209 let txn_id = TransactionId(count + 1);
1210 match wal.log_begin(txn_id) {
1211 Ok(_) => {
1212 count += 1;
1213 if count > 1000 {
1215 panic!("Should have hit WAL limit by now");
1216 }
1217 }
1218 Err(Error::WalLimitExceeded { .. }) => {
1219 break;
1221 }
1222 Err(e) => {
1223 panic!("Unexpected error: {}", e);
1224 }
1225 }
1226 }
1227
1228 assert!(count > 0, "Should have written at least some records");
1229 assert!(count < 1000, "Should have hit limit before 1000 records");
1230
1231 wal.shutdown().unwrap();
1232 }
1233
1234 #[test]
1235 fn test_wal_unlimited() {
1236 let tmp = TempDir::new().unwrap();
1237 let wal_path = tmp.path().join("test_unlimited.wal");
1238
1239 let mut wal =
1241 Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), None).unwrap();
1242
1243 for i in 1..=100 {
1245 let txn_id = TransactionId(i);
1246 let lsn1 = wal.log_begin(txn_id).unwrap();
1247 wal.log_commit(txn_id, lsn1).unwrap();
1248 }
1249
1250 wal.shutdown().unwrap();
1251 }
1252
1253 #[test]
1254 fn test_wal_truncate_resets_size() {
1255 let tmp = TempDir::new().unwrap();
1256 let wal_path = tmp.path().join("test_truncate.wal");
1257
1258 let max_size = Some(100 * 1024); let mut wal =
1260 Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), max_size).unwrap();
1261
1262 for i in 1..=10 {
1264 let txn_id = TransactionId(i);
1265 let lsn1 = wal.log_begin(txn_id).unwrap();
1266 wal.log_commit(txn_id, lsn1).unwrap();
1267 }
1268
1269 let size_before = wal.current_size.load(Ordering::Relaxed);
1270 assert!(size_before > 0);
1271
1272 wal.truncate().unwrap();
1274
1275 let size_after = wal.current_size.load(Ordering::Relaxed);
1276 assert_eq!(size_after, 0);
1277
1278 let lsn1 = wal.log_begin(TransactionId(100)).unwrap();
1280 wal.log_commit(TransactionId(100), lsn1).unwrap();
1281
1282 wal.shutdown().unwrap();
1283 }
1284}