Skip to main content

featherdb_storage/
wal.rs

1//! Write-Ahead Log (WAL) for durability with Group Commit support
2//!
3//! This module provides a WAL implementation with three sync modes:
4//! - `Immediate`: Sync on every commit (safest, slowest)
5//! - `GroupCommit`: Batch multiple commits into a single fsync (2-5x faster)
6//! - `NoSync`: No sync - data may be lost on crash (fastest, least safe)
7//!
8//! Group commit works by buffering WAL records and periodically flushing them
9//! to disk with a single fsync operation. This amortizes the cost of fsync
10//! across multiple transactions, significantly improving throughput.
11
12use bytes::{Buf, BufMut};
13use featherdb_core::{Error, Lsn, Result, TransactionId, WalGroupCommitConfig, WalSyncMode};
14use parking_lot::{Condvar, Mutex, RwLock};
15use std::collections::VecDeque;
16use std::fs::{File, OpenOptions};
17use std::io::{BufReader, Read, Seek, SeekFrom, Write};
18use std::path::Path;
19use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20use std::sync::Arc;
21use std::thread::{self, JoinHandle};
22use std::time::{Duration, Instant};
23
24/// WAL record types
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26#[repr(u8)]
27pub enum WalRecordType {
28    /// Transaction begin
29    Begin = 0,
30    /// Transaction commit
31    Commit = 1,
32    /// Transaction abort
33    Abort = 2,
34    /// Page write
35    Write = 3,
36    /// Checkpoint marker
37    Checkpoint = 4,
38}
39
40impl TryFrom<u8> for WalRecordType {
41    type Error = Error;
42
43    fn try_from(value: u8) -> Result<Self> {
44        match value {
45            0 => Ok(WalRecordType::Begin),
46            1 => Ok(WalRecordType::Commit),
47            2 => Ok(WalRecordType::Abort),
48            3 => Ok(WalRecordType::Write),
49            4 => Ok(WalRecordType::Checkpoint),
50            _ => Err(Error::CorruptedWal {
51                message: format!("Invalid record type: {}", value),
52            }),
53        }
54    }
55}
56
57/// A WAL record
58#[derive(Debug, Clone)]
59pub struct WalRecord {
60    /// Log sequence number
61    pub lsn: Lsn,
62    /// Transaction ID
63    pub txn_id: TransactionId,
64    /// Record type
65    pub record_type: WalRecordType,
66    /// Previous LSN for this transaction (for undo chain)
67    pub prev_lsn: Lsn,
68    /// Page ID (for Write records)
69    pub page_id: u64,
70    /// Old page data (for undo)
71    pub old_data: Vec<u8>,
72    /// New page data (for redo)
73    pub new_data: Vec<u8>,
74}
75
76impl WalRecord {
77    /// Header size: lsn(8) + txn_id(8) + type(1) + prev_lsn(8) + page_id(8) + old_len(4) + new_len(4) + checksum(4)
78    const HEADER_SIZE: usize = 45;
79
80    /// Create a begin record
81    pub fn begin(lsn: Lsn, txn_id: TransactionId) -> Self {
82        WalRecord {
83            lsn,
84            txn_id,
85            record_type: WalRecordType::Begin,
86            prev_lsn: Lsn::ZERO,
87            page_id: 0,
88            old_data: vec![],
89            new_data: vec![],
90        }
91    }
92
93    /// Create a commit record
94    pub fn commit(lsn: Lsn, txn_id: TransactionId, prev_lsn: Lsn) -> Self {
95        WalRecord {
96            lsn,
97            txn_id,
98            record_type: WalRecordType::Commit,
99            prev_lsn,
100            page_id: 0,
101            old_data: vec![],
102            new_data: vec![],
103        }
104    }
105
106    /// Create an abort record
107    pub fn abort(lsn: Lsn, txn_id: TransactionId, prev_lsn: Lsn) -> Self {
108        WalRecord {
109            lsn,
110            txn_id,
111            record_type: WalRecordType::Abort,
112            prev_lsn,
113            page_id: 0,
114            old_data: vec![],
115            new_data: vec![],
116        }
117    }
118
119    /// Create a write record
120    pub fn write(
121        lsn: Lsn,
122        txn_id: TransactionId,
123        prev_lsn: Lsn,
124        page_id: u64,
125        old_data: Vec<u8>,
126        new_data: Vec<u8>,
127    ) -> Self {
128        WalRecord {
129            lsn,
130            txn_id,
131            record_type: WalRecordType::Write,
132            prev_lsn,
133            page_id,
134            old_data,
135            new_data,
136        }
137    }
138
139    /// Create a checkpoint record
140    pub fn checkpoint(lsn: Lsn) -> Self {
141        WalRecord {
142            lsn,
143            txn_id: TransactionId::NONE,
144            record_type: WalRecordType::Checkpoint,
145            prev_lsn: Lsn::ZERO,
146            page_id: 0,
147            old_data: vec![],
148            new_data: vec![],
149        }
150    }
151
152    /// Serialize the record to bytes
153    pub fn serialize(&self) -> Vec<u8> {
154        let payload_size = self.old_data.len() + self.new_data.len();
155        let mut buf = Vec::with_capacity(Self::HEADER_SIZE + payload_size);
156
157        buf.put_u64_le(self.lsn.0);
158        buf.put_u64_le(self.txn_id.0);
159        buf.put_u8(self.record_type as u8);
160        buf.put_u64_le(self.prev_lsn.0);
161        buf.put_u64_le(self.page_id);
162        buf.put_u32_le(self.old_data.len() as u32);
163        buf.put_u32_le(self.new_data.len() as u32);
164        buf.extend_from_slice(&self.old_data);
165        buf.extend_from_slice(&self.new_data);
166
167        // Compute CRC32 checksum
168        let checksum = crc32(&buf);
169        buf.put_u32_le(checksum);
170
171        buf
172    }
173
174    /// Deserialize a record from bytes
175    pub fn deserialize(data: &[u8]) -> Result<Self> {
176        if data.len() < Self::HEADER_SIZE {
177            return Err(Error::CorruptedWal {
178                message: "Record too small".into(),
179            });
180        }
181
182        let mut cursor = data;
183
184        let lsn = Lsn(cursor.get_u64_le());
185        let txn_id = TransactionId(cursor.get_u64_le());
186        let record_type = WalRecordType::try_from(cursor.get_u8())?;
187        let prev_lsn = Lsn(cursor.get_u64_le());
188        let page_id = cursor.get_u64_le();
189        let old_len = cursor.get_u32_le() as usize;
190        let new_len = cursor.get_u32_le() as usize;
191
192        if cursor.remaining() < old_len + new_len + 4 {
193            return Err(Error::CorruptedWal {
194                message: "Record data truncated".into(),
195            });
196        }
197
198        let mut old_data = vec![0u8; old_len];
199        cursor.copy_to_slice(&mut old_data);
200
201        let mut new_data = vec![0u8; new_len];
202        cursor.copy_to_slice(&mut new_data);
203
204        let stored_checksum = cursor.get_u32_le();
205
206        // Verify checksum
207        let data_without_checksum = &data[..data.len() - 4];
208        let computed_checksum = crc32(data_without_checksum);
209
210        if stored_checksum != computed_checksum {
211            return Err(Error::CorruptedWal {
212                message: format!(
213                    "Checksum mismatch: stored {}, computed {}",
214                    stored_checksum, computed_checksum
215                ),
216            });
217        }
218
219        Ok(WalRecord {
220            lsn,
221            txn_id,
222            record_type,
223            prev_lsn,
224            page_id,
225            old_data,
226            new_data,
227        })
228    }
229
230    /// Get the total serialized size
231    pub fn serialized_size(&self) -> usize {
232        Self::HEADER_SIZE + self.old_data.len() + self.new_data.len()
233    }
234}
235
236/// Simple CRC32 implementation
237fn crc32(data: &[u8]) -> u32 {
238    let mut crc: u32 = 0xFFFFFFFF;
239    for byte in data {
240        crc ^= *byte as u32;
241        for _ in 0..8 {
242            if crc & 1 != 0 {
243                crc = (crc >> 1) ^ 0xEDB88320;
244            } else {
245                crc >>= 1;
246            }
247        }
248    }
249    !crc
250}
251
252/// Statistics for WAL operations
253#[derive(Debug, Default)]
254pub struct WalStats {
255    /// Number of records currently buffered (for group commit)
256    records_buffered: AtomicU64,
257    /// Total number of group commits performed
258    group_commits: AtomicU64,
259    /// Total records written via group commit
260    group_commit_records: AtomicU64,
261    /// Total number of fsync operations
262    fsync_count: AtomicU64,
263    /// Total records written (all modes)
264    total_records: AtomicU64,
265}
266
267impl WalStats {
268    /// Create new stats
269    pub fn new() -> Self {
270        Self::default()
271    }
272
273    /// Get number of records currently buffered
274    pub fn records_buffered(&self) -> u64 {
275        self.records_buffered.load(Ordering::Relaxed)
276    }
277
278    /// Get total number of group commits
279    pub fn group_commits(&self) -> u64 {
280        self.group_commits.load(Ordering::Relaxed)
281    }
282
283    /// Get average batch size for group commits
284    pub fn average_batch_size(&self) -> f64 {
285        let commits = self.group_commits.load(Ordering::Relaxed);
286        let records = self.group_commit_records.load(Ordering::Relaxed);
287        if commits == 0 {
288            0.0
289        } else {
290            records as f64 / commits as f64
291        }
292    }
293
294    /// Get total fsync count
295    pub fn fsync_count(&self) -> u64 {
296        self.fsync_count.load(Ordering::Relaxed)
297    }
298
299    /// Get total records written
300    pub fn total_records(&self) -> u64 {
301        self.total_records.load(Ordering::Relaxed)
302    }
303
304    /// Get a snapshot of current statistics
305    pub fn snapshot(&self) -> WalStatsSnapshot {
306        WalStatsSnapshot {
307            records_buffered: self.records_buffered.load(Ordering::Relaxed),
308            group_commits: self.group_commits.load(Ordering::Relaxed),
309            group_commit_records: self.group_commit_records.load(Ordering::Relaxed),
310            fsync_count: self.fsync_count.load(Ordering::Relaxed),
311            total_records: self.total_records.load(Ordering::Relaxed),
312        }
313    }
314
315    /// Record a buffered record
316    fn record_buffered(&self) {
317        self.records_buffered.fetch_add(1, Ordering::Relaxed);
318    }
319
320    /// Record records flushed from buffer
321    fn records_flushed(&self, count: u64) {
322        self.records_buffered.fetch_sub(count, Ordering::Relaxed);
323    }
324
325    /// Record a group commit
326    fn record_group_commit(&self, batch_size: u64) {
327        self.group_commits.fetch_add(1, Ordering::Relaxed);
328        self.group_commit_records
329            .fetch_add(batch_size, Ordering::Relaxed);
330    }
331
332    /// Record an fsync
333    fn record_fsync(&self) {
334        self.fsync_count.fetch_add(1, Ordering::Relaxed);
335    }
336
337    /// Record a written record
338    fn record_written(&self) {
339        self.total_records.fetch_add(1, Ordering::Relaxed);
340    }
341}
342
343/// Immutable snapshot of WAL statistics
344#[derive(Debug, Clone, Copy)]
345pub struct WalStatsSnapshot {
346    pub records_buffered: u64,
347    pub group_commits: u64,
348    pub group_commit_records: u64,
349    pub fsync_count: u64,
350    pub total_records: u64,
351}
352
353impl WalStatsSnapshot {
354    /// Get average batch size for group commits
355    pub fn average_batch_size(&self) -> f64 {
356        if self.group_commits == 0 {
357            0.0
358        } else {
359            self.group_commit_records as f64 / self.group_commits as f64
360        }
361    }
362}
363
364/// A pending commit waiting for group flush
365struct PendingCommit {
366    /// The LSN of this commit
367    lsn: Lsn,
368    /// Whether this commit has been synced
369    synced: bool,
370}
371
372/// Internal buffer for group commit
373struct WalBuffer {
374    /// Buffered records waiting to be flushed
375    records: VecDeque<WalRecord>,
376    /// Pending commits waiting for sync notification
377    pending_commits: VecDeque<PendingCommit>,
378    /// LSN up to which data has been synced
379    synced_lsn: Lsn,
380}
381
382impl WalBuffer {
383    fn new() -> Self {
384        WalBuffer {
385            records: VecDeque::new(),
386            pending_commits: VecDeque::new(),
387            synced_lsn: Lsn::ZERO,
388        }
389    }
390
391    fn is_empty(&self) -> bool {
392        self.records.is_empty()
393    }
394
395    fn len(&self) -> usize {
396        self.records.len()
397    }
398}
399
400/// Background flusher thread state
401struct FlushWorker {
402    /// Shared buffer
403    buffer: Arc<Mutex<WalBuffer>>,
404    /// Condition variable for waking the flusher
405    flush_cond: Arc<Condvar>,
406    /// Condition variable for notifying waiters
407    sync_cond: Arc<Condvar>,
408    /// WAL file
409    file: Arc<Mutex<File>>,
410    /// Configuration
411    config: WalGroupCommitConfig,
412    /// Statistics
413    stats: Arc<WalStats>,
414    /// Shutdown flag
415    shutdown: Arc<AtomicBool>,
416}
417
418impl FlushWorker {
419    fn run(&self) {
420        let interval = Duration::from_millis(self.config.group_commit_interval_ms);
421        let mut last_flush = Instant::now();
422
423        loop {
424            // Check for shutdown
425            if self.shutdown.load(Ordering::Relaxed) {
426                // Flush any remaining records before shutting down
427                self.flush_buffer();
428                break;
429            }
430
431            // Wait for records or timeout
432            {
433                let mut buffer = self.buffer.lock();
434                let should_flush = !buffer.is_empty()
435                    && (buffer.len() >= self.config.group_commit_max_batch
436                        || last_flush.elapsed() >= interval);
437
438                if buffer.is_empty() {
439                    // Wait for new records with timeout
440                    self.flush_cond.wait_for(&mut buffer, interval);
441                } else if !should_flush {
442                    // Wait for more records or timeout
443                    let remaining = interval.saturating_sub(last_flush.elapsed());
444                    if !remaining.is_zero() {
445                        self.flush_cond.wait_for(&mut buffer, remaining);
446                    }
447                }
448            }
449
450            // Flush if we have records
451            let flushed = self.flush_buffer();
452            if flushed {
453                last_flush = Instant::now();
454            }
455        }
456    }
457
458    fn flush_buffer(&self) -> bool {
459        let mut buffer = self.buffer.lock();
460
461        if buffer.is_empty() {
462            return false;
463        }
464
465        // Take all records from the buffer
466        let records: Vec<_> = buffer.records.drain(..).collect();
467        let record_count = records.len() as u64;
468
469        // Update stats
470        self.stats.records_flushed(record_count);
471
472        // Serialize all records
473        let mut data = Vec::new();
474        let mut max_lsn = Lsn::ZERO;
475
476        for record in &records {
477            let serialized = record.serialize();
478            let len = serialized.len() as u32;
479            data.extend_from_slice(&len.to_le_bytes());
480            data.extend_from_slice(&serialized);
481            if record.lsn > max_lsn {
482                max_lsn = record.lsn;
483            }
484        }
485
486        // Release buffer lock before I/O
487        drop(buffer);
488
489        // Write all records at once
490        {
491            let mut file = self.file.lock();
492            if let Err(e) = file.seek(SeekFrom::End(0)) {
493                eprintln!("WAL seek error: {}", e);
494                return false;
495            }
496            if let Err(e) = file.write_all(&data) {
497                eprintln!("WAL write error: {}", e);
498                return false;
499            }
500            // Single fsync for all records
501            if let Err(e) = file.sync_all() {
502                eprintln!("WAL sync error: {}", e);
503                return false;
504            }
505        }
506
507        // Update stats
508        self.stats.record_fsync();
509        self.stats.record_group_commit(record_count);
510
511        // Update synced LSN and notify waiters
512        {
513            let mut buffer = self.buffer.lock();
514            buffer.synced_lsn = max_lsn;
515
516            // Mark all pending commits up to this LSN as synced
517            for pending in buffer.pending_commits.iter_mut() {
518                if pending.lsn <= max_lsn {
519                    pending.synced = true;
520                }
521            }
522        }
523
524        // Notify all waiters
525        self.sync_cond.notify_all();
526
527        true
528    }
529}
530
531/// Write-Ahead Log manager with group commit support
532pub struct Wal {
533    /// WAL file
534    file: Arc<Mutex<File>>,
535    /// Current LSN (protected by RwLock for concurrent reads)
536    current_lsn: RwLock<Lsn>,
537    /// File path (kept for diagnostics/logging)
538    #[allow(dead_code)]
539    path: std::path::PathBuf,
540    /// Configuration
541    config: WalGroupCommitConfig,
542    /// Group commit buffer
543    buffer: Arc<Mutex<WalBuffer>>,
544    /// Condition variable for waking the flusher
545    flush_cond: Arc<Condvar>,
546    /// Condition variable for notifying commit waiters
547    sync_cond: Arc<Condvar>,
548    /// Background flush thread handle
549    flush_thread: Option<JoinHandle<()>>,
550    /// Shutdown flag
551    shutdown: Arc<AtomicBool>,
552    /// Statistics
553    stats: Arc<WalStats>,
554    /// Maximum WAL file size in bytes (None = unlimited)
555    max_size: Option<u64>,
556    /// Current WAL file size in bytes (approximate, tracked via writes)
557    current_size: AtomicU64,
558}
559
560impl Wal {
561    /// Open or create a WAL file with default configuration
562    pub fn open(path: &Path) -> Result<Self> {
563        Self::open_with_config(path, WalGroupCommitConfig::default(), None)
564    }
565
566    /// Open or create a WAL file with custom configuration
567    pub fn open_with_config(
568        path: &Path,
569        config: WalGroupCommitConfig,
570        max_size: Option<u64>,
571    ) -> Result<Self> {
572        let file = OpenOptions::new()
573            .read(true)
574            .create(true)
575            .append(true)
576            .open(path)?;
577
578        // Find the last LSN by scanning the file
579        let current_lsn = Self::find_last_lsn(&file)?;
580
581        // Get current file size
582        let current_size = file.metadata()?.len();
583
584        let file = Arc::new(Mutex::new(file));
585        let buffer = Arc::new(Mutex::new(WalBuffer::new()));
586        let flush_cond = Arc::new(Condvar::new());
587        let sync_cond = Arc::new(Condvar::new());
588        let shutdown = Arc::new(AtomicBool::new(false));
589        let stats = Arc::new(WalStats::new());
590
591        // Start background flush thread for group commit mode
592        let flush_thread = if config.sync_mode == WalSyncMode::GroupCommit {
593            let worker = FlushWorker {
594                buffer: buffer.clone(),
595                flush_cond: flush_cond.clone(),
596                sync_cond: sync_cond.clone(),
597                file: file.clone(),
598                config: config.clone(),
599                stats: stats.clone(),
600                shutdown: shutdown.clone(),
601            };
602
603            Some(thread::spawn(move || {
604                worker.run();
605            }))
606        } else {
607            None
608        };
609
610        Ok(Wal {
611            file,
612            current_lsn: RwLock::new(current_lsn),
613            path: path.to_path_buf(),
614            config,
615            buffer,
616            flush_cond,
617            sync_cond,
618            flush_thread,
619            shutdown,
620            stats,
621            max_size,
622            current_size: AtomicU64::new(current_size),
623        })
624    }
625
626    /// Find the last LSN in the WAL file
627    fn find_last_lsn(file: &File) -> Result<Lsn> {
628        let mut reader = BufReader::new(file);
629        let file_len = reader.seek(SeekFrom::End(0))?;
630
631        if file_len == 0 {
632            return Ok(Lsn(1)); // Start from 1
633        }
634
635        reader.seek(SeekFrom::Start(0))?;
636
637        let mut last_lsn = Lsn(0);
638        let mut buf = Vec::new();
639
640        loop {
641            // Read record length prefix (4 bytes)
642            let mut len_buf = [0u8; 4];
643            match reader.read_exact(&mut len_buf) {
644                Ok(_) => {}
645                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
646                Err(e) => return Err(e.into()),
647            }
648
649            let record_len = u32::from_le_bytes(len_buf) as usize;
650
651            // Read record data
652            buf.resize(record_len, 0);
653            match reader.read_exact(&mut buf) {
654                Ok(_) => {}
655                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
656                Err(e) => return Err(e.into()),
657            }
658
659            // Parse to get LSN
660            if let Ok(record) = WalRecord::deserialize(&buf) {
661                last_lsn = record.lsn;
662            }
663        }
664
665        Ok(Lsn(last_lsn.0 + 1))
666    }
667
668    /// Get the current LSN
669    pub fn current_lsn(&self) -> Lsn {
670        *self.current_lsn.read()
671    }
672
673    /// Check if WAL has records that need recovery (non-empty WAL)
674    pub fn needs_recovery(&self) -> bool {
675        self.current_size.load(Ordering::Relaxed) > 0
676    }
677
678    /// Get WAL statistics
679    pub fn stats(&self) -> &Arc<WalStats> {
680        &self.stats
681    }
682
683    /// Get the sync mode
684    pub fn sync_mode(&self) -> WalSyncMode {
685        self.config.sync_mode
686    }
687
688    /// Get current WAL file size in bytes
689    pub fn current_size(&self) -> u64 {
690        self.current_size.load(Ordering::Relaxed)
691    }
692
693    /// Get configured maximum WAL size (None if unlimited)
694    pub fn max_size(&self) -> Option<u64> {
695        self.max_size
696    }
697
698    /// Check if WAL write would exceed limit
699    fn check_wal_limit(&self, write_size: u64) -> Result<()> {
700        if let Some(limit) = self.max_size {
701            let current = self.current_size.load(Ordering::Relaxed);
702            if current.saturating_add(write_size) > limit {
703                return Err(Error::WalLimitExceeded {
704                    current_bytes: current,
705                    limit_bytes: limit,
706                });
707            }
708        }
709        Ok(())
710    }
711
712    /// Append a record to the WAL (internal, doesn't sync)
713    fn append_internal(&self, mut record: WalRecord) -> Result<Lsn> {
714        let mut current_lsn = self.current_lsn.write();
715        record.lsn = *current_lsn;
716        let lsn = *current_lsn;
717        *current_lsn = current_lsn.next();
718
719        // Calculate the write size (length prefix + serialized data)
720        let serialized = record.serialize();
721        let write_size = (4 + serialized.len()) as u64; // 4 bytes for length prefix
722
723        // Check WAL size limit before writing
724        self.check_wal_limit(write_size)?;
725
726        self.stats.record_written();
727
728        match self.config.sync_mode {
729            WalSyncMode::Immediate => {
730                // Write directly to file
731                let len = serialized.len() as u32;
732
733                let mut file = self.file.lock();
734                file.seek(SeekFrom::End(0))?;
735                file.write_all(&len.to_le_bytes())?;
736                file.write_all(&serialized)?;
737
738                // Update size counter
739                self.current_size.fetch_add(write_size, Ordering::Relaxed);
740            }
741            WalSyncMode::GroupCommit => {
742                // Add to buffer
743                let mut buffer = self.buffer.lock();
744                buffer.records.push_back(record);
745                self.stats.record_buffered();
746
747                // Update size counter (estimate - actual write happens in flush)
748                self.current_size.fetch_add(write_size, Ordering::Relaxed);
749
750                // Wake flusher if batch is full
751                if buffer.len() >= self.config.group_commit_max_batch {
752                    self.flush_cond.notify_one();
753                }
754            }
755            WalSyncMode::NoSync => {
756                // Write directly to file without sync
757                let len = serialized.len() as u32;
758
759                let mut file = self.file.lock();
760                file.seek(SeekFrom::End(0))?;
761                file.write_all(&len.to_le_bytes())?;
762                file.write_all(&serialized)?;
763
764                // Update size counter
765                self.current_size.fetch_add(write_size, Ordering::Relaxed);
766            }
767        }
768
769        Ok(lsn)
770    }
771
772    /// Append a record to the WAL
773    pub fn append(&mut self, record: WalRecord) -> Result<Lsn> {
774        self.append_internal(record)
775    }
776
777    /// Sync the WAL to disk
778    pub fn sync(&self) -> Result<()> {
779        match self.config.sync_mode {
780            WalSyncMode::Immediate | WalSyncMode::NoSync => {
781                self.file.lock().sync_all()?;
782                self.stats.record_fsync();
783            }
784            WalSyncMode::GroupCommit => {
785                // Force a flush of the buffer
786                self.flush_cond.notify_one();
787
788                // Wait for sync to complete
789                let current_lsn = self.current_lsn();
790                let mut buffer = self.buffer.lock();
791                while buffer.synced_lsn < current_lsn && !buffer.is_empty() {
792                    self.sync_cond.wait(&mut buffer);
793                }
794            }
795        }
796        Ok(())
797    }
798
799    /// Flush the group commit buffer (for group commit mode)
800    /// Returns the LSN up to which records were synced
801    pub fn flush_group(&self) -> Result<Lsn> {
802        if self.config.sync_mode != WalSyncMode::GroupCommit {
803            // In non-group-commit mode, just sync
804            self.sync()?;
805            return Ok(self.current_lsn());
806        }
807
808        // Wake the flusher
809        self.flush_cond.notify_one();
810
811        // Wait for the flush to complete
812        let target_lsn = {
813            let current_lsn = self.current_lsn.read();
814            Lsn(current_lsn.0.saturating_sub(1)) // Last written LSN
815        };
816
817        let mut buffer = self.buffer.lock();
818        while buffer.synced_lsn < target_lsn {
819            self.sync_cond.wait(&mut buffer);
820        }
821
822        Ok(buffer.synced_lsn)
823    }
824
825    /// Truncate the WAL (after checkpoint)
826    pub fn truncate(&mut self) -> Result<()> {
827        // Ensure all buffered records are flushed first
828        if self.config.sync_mode == WalSyncMode::GroupCommit {
829            self.flush_group()?;
830        }
831
832        let file = self.file.lock();
833        file.set_len(0)?;
834        file.sync_all()?;
835        *self.current_lsn.write() = Lsn(1);
836
837        // Reset size counter
838        self.current_size.store(0, Ordering::Relaxed);
839
840        // Clear the buffer
841        let mut buffer = self.buffer.lock();
842        buffer.records.clear();
843        buffer.pending_commits.clear();
844        buffer.synced_lsn = Lsn::ZERO;
845
846        Ok(())
847    }
848
849    /// Iterate over all records in the WAL
850    pub fn iter(&self) -> Result<WalIterator> {
851        let file = self.file.lock();
852        let mut reader = BufReader::new(file.try_clone()?);
853        reader.seek(SeekFrom::Start(0))?;
854
855        Ok(WalIterator { reader })
856    }
857
858    /// Log a transaction begin
859    pub fn log_begin(&mut self, txn_id: TransactionId) -> Result<Lsn> {
860        let record = WalRecord::begin(Lsn::ZERO, txn_id);
861        self.append(record)
862    }
863
864    /// Log a transaction commit
865    ///
866    /// In GroupCommit mode, this adds the commit to the buffer and waits
867    /// for the group flush to complete before returning.
868    pub fn log_commit(&mut self, txn_id: TransactionId, prev_lsn: Lsn) -> Result<Lsn> {
869        let record = WalRecord::commit(Lsn::ZERO, txn_id, prev_lsn);
870        let lsn = self.append(record)?;
871
872        match self.config.sync_mode {
873            WalSyncMode::Immediate => {
874                self.file.lock().sync_all()?;
875                self.stats.record_fsync();
876            }
877            WalSyncMode::GroupCommit => {
878                // Add to pending commits and wait for sync
879                {
880                    let mut buffer = self.buffer.lock();
881                    buffer
882                        .pending_commits
883                        .push_back(PendingCommit { lsn, synced: false });
884                }
885
886                // Wake the flusher
887                self.flush_cond.notify_one();
888
889                // Wait for this commit to be synced
890                let mut buffer = self.buffer.lock();
891                loop {
892                    // Check if our commit is synced
893                    if buffer.synced_lsn >= lsn {
894                        // Remove our pending commit
895                        buffer.pending_commits.retain(|p| p.lsn != lsn);
896                        break;
897                    }
898                    self.sync_cond.wait(&mut buffer);
899                }
900            }
901            WalSyncMode::NoSync => {
902                // No sync needed
903            }
904        }
905
906        Ok(lsn)
907    }
908
909    /// Log a transaction abort
910    pub fn log_abort(&mut self, txn_id: TransactionId, prev_lsn: Lsn) -> Result<Lsn> {
911        let record = WalRecord::abort(Lsn::ZERO, txn_id, prev_lsn);
912        self.append(record)
913    }
914
915    /// Log a page write
916    pub fn log_write(
917        &mut self,
918        txn_id: TransactionId,
919        prev_lsn: Lsn,
920        page_id: u64,
921        old_data: Vec<u8>,
922        new_data: Vec<u8>,
923    ) -> Result<Lsn> {
924        let record = WalRecord::write(Lsn::ZERO, txn_id, prev_lsn, page_id, old_data, new_data);
925        self.append(record)
926    }
927
928    /// Log a checkpoint
929    pub fn log_checkpoint(&mut self) -> Result<Lsn> {
930        let record = WalRecord::checkpoint(Lsn::ZERO);
931        let lsn = self.append(record)?;
932
933        // Always sync on checkpoint
934        match self.config.sync_mode {
935            WalSyncMode::GroupCommit => {
936                self.flush_group()?;
937            }
938            _ => {
939                self.file.lock().sync_all()?;
940                self.stats.record_fsync();
941            }
942        }
943
944        Ok(lsn)
945    }
946
947    /// Shutdown the WAL gracefully
948    pub fn shutdown(&mut self) -> Result<()> {
949        // Signal shutdown
950        self.shutdown.store(true, Ordering::Relaxed);
951
952        // Wake the flusher so it can flush remaining records and exit
953        self.flush_cond.notify_one();
954
955        // Wait for the flush thread to finish
956        if let Some(handle) = self.flush_thread.take() {
957            handle
958                .join()
959                .map_err(|_| Error::Internal("Flush thread panicked".into()))?;
960        }
961
962        // Final sync
963        self.file.lock().sync_all()?;
964
965        Ok(())
966    }
967}
968
969impl Drop for Wal {
970    fn drop(&mut self) {
971        // Try to shutdown gracefully
972        let _ = self.shutdown();
973    }
974}
975
976/// Iterator over WAL records
977pub struct WalIterator {
978    reader: BufReader<File>,
979}
980
981impl Iterator for WalIterator {
982    type Item = Result<WalRecord>;
983
984    fn next(&mut self) -> Option<Self::Item> {
985        // Read length prefix
986        let mut len_buf = [0u8; 4];
987        match self.reader.read_exact(&mut len_buf) {
988            Ok(_) => {}
989            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return None,
990            Err(e) => return Some(Err(e.into())),
991        }
992
993        let record_len = u32::from_le_bytes(len_buf) as usize;
994
995        // Read record data
996        let mut buf = vec![0u8; record_len];
997        match self.reader.read_exact(&mut buf) {
998            Ok(_) => {}
999            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return None,
1000            Err(e) => return Some(Err(e.into())),
1001        }
1002
1003        Some(WalRecord::deserialize(&buf))
1004    }
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009    use super::*;
1010    use tempfile::TempDir;
1011
1012    #[test]
1013    fn test_wal_record_serialization() {
1014        let record = WalRecord::write(
1015            Lsn(1),
1016            TransactionId(100),
1017            Lsn::ZERO,
1018            42,
1019            vec![1, 2, 3],
1020            vec![4, 5, 6],
1021        );
1022
1023        let serialized = record.serialize();
1024        let deserialized = WalRecord::deserialize(&serialized).unwrap();
1025
1026        assert_eq!(record.lsn, deserialized.lsn);
1027        assert_eq!(record.txn_id, deserialized.txn_id);
1028        assert_eq!(record.record_type, deserialized.record_type);
1029        assert_eq!(record.page_id, deserialized.page_id);
1030        assert_eq!(record.old_data, deserialized.old_data);
1031        assert_eq!(record.new_data, deserialized.new_data);
1032    }
1033
1034    #[test]
1035    fn test_wal_append_iterate() {
1036        let tmp = TempDir::new().unwrap();
1037        let wal_path = tmp.path().join("test.wal");
1038
1039        let mut wal = Wal::open(&wal_path).unwrap();
1040
1041        // Append some records
1042        let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1043        let lsn2 = wal
1044            .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1045            .unwrap();
1046        let _lsn3 = wal.log_commit(TransactionId(1), lsn2).unwrap();
1047
1048        // Iterate
1049        let records: Vec<_> = wal.iter().unwrap().collect();
1050        assert_eq!(records.len(), 3);
1051
1052        assert_eq!(
1053            records[0].as_ref().unwrap().record_type,
1054            WalRecordType::Begin
1055        );
1056        assert_eq!(
1057            records[1].as_ref().unwrap().record_type,
1058            WalRecordType::Write
1059        );
1060        assert_eq!(
1061            records[2].as_ref().unwrap().record_type,
1062            WalRecordType::Commit
1063        );
1064    }
1065
1066    #[test]
1067    fn test_wal_recovery() {
1068        let tmp = TempDir::new().unwrap();
1069        let wal_path = tmp.path().join("test.wal");
1070
1071        // Write some records
1072        {
1073            let mut wal = Wal::open(&wal_path).unwrap();
1074            wal.log_begin(TransactionId(1)).unwrap();
1075            wal.sync().unwrap();
1076        }
1077
1078        // Reopen and check
1079        {
1080            let wal = Wal::open(&wal_path).unwrap();
1081            let records: Vec<_> = wal.iter().unwrap().collect();
1082            assert_eq!(records.len(), 1);
1083        }
1084    }
1085
1086    #[test]
1087    fn test_wal_group_commit() {
1088        let tmp = TempDir::new().unwrap();
1089        let wal_path = tmp.path().join("test_group.wal");
1090
1091        let config = WalGroupCommitConfig {
1092            sync_mode: WalSyncMode::GroupCommit,
1093            group_commit_interval_ms: 50,
1094            group_commit_max_batch: 10,
1095        };
1096
1097        let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1098
1099        // Write multiple transactions
1100        for i in 1..=5 {
1101            let txn_id = TransactionId(i);
1102            let lsn1 = wal.log_begin(txn_id).unwrap();
1103            let lsn2 = wal
1104                .log_write(txn_id, lsn1, i * 10, vec![0], vec![1])
1105                .unwrap();
1106            wal.log_commit(txn_id, lsn2).unwrap();
1107        }
1108
1109        // Verify all records are written
1110        let records: Vec<_> = wal.iter().unwrap().collect();
1111        assert_eq!(records.len(), 15); // 5 transactions * 3 records each
1112
1113        // Check stats
1114        let stats = wal.stats();
1115        assert!(stats.group_commits() >= 1);
1116        assert_eq!(stats.total_records(), 15);
1117
1118        // Shutdown gracefully
1119        wal.shutdown().unwrap();
1120    }
1121
1122    #[test]
1123    fn test_wal_no_sync_mode() {
1124        let tmp = TempDir::new().unwrap();
1125        let wal_path = tmp.path().join("test_nosync.wal");
1126
1127        let config = WalGroupCommitConfig {
1128            sync_mode: WalSyncMode::NoSync,
1129            ..Default::default()
1130        };
1131
1132        let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1133
1134        // Write some records
1135        let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1136        let lsn2 = wal
1137            .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1138            .unwrap();
1139        wal.log_commit(TransactionId(1), lsn2).unwrap();
1140
1141        // No fsyncs should have been done for commit
1142        let stats = wal.stats();
1143        assert_eq!(stats.fsync_count(), 0);
1144
1145        // Manual sync
1146        wal.sync().unwrap();
1147        assert_eq!(stats.fsync_count(), 1);
1148    }
1149
1150    #[test]
1151    fn test_wal_stats() {
1152        let tmp = TempDir::new().unwrap();
1153        let wal_path = tmp.path().join("test_stats.wal");
1154
1155        let mut wal = Wal::open(&wal_path).unwrap();
1156
1157        let lsn1 = wal.log_begin(TransactionId(1)).unwrap();
1158        let lsn2 = wal
1159            .log_write(TransactionId(1), lsn1, 10, vec![0], vec![1])
1160            .unwrap();
1161        wal.log_commit(TransactionId(1), lsn2).unwrap();
1162
1163        let stats = wal.stats();
1164        assert_eq!(stats.total_records(), 3);
1165        // In immediate mode, we sync on every commit
1166        assert!(stats.fsync_count() >= 1);
1167    }
1168
1169    #[test]
1170    fn test_wal_batch_flush() {
1171        let tmp = TempDir::new().unwrap();
1172        let wal_path = tmp.path().join("test_batch.wal");
1173
1174        let config = WalGroupCommitConfig {
1175            sync_mode: WalSyncMode::GroupCommit,
1176            group_commit_interval_ms: 1000, // Long interval
1177            group_commit_max_batch: 5,      // Small batch size
1178        };
1179
1180        let mut wal = Wal::open_with_config(&wal_path, config, None).unwrap();
1181
1182        // Write enough records to trigger batch flush
1183        for i in 1..=3 {
1184            let txn_id = TransactionId(i);
1185            let lsn1 = wal.log_begin(txn_id).unwrap();
1186            wal.log_commit(txn_id, lsn1).unwrap();
1187        }
1188
1189        // Records should be flushed due to commits
1190        let records: Vec<_> = wal.iter().unwrap().collect();
1191        assert_eq!(records.len(), 6); // 3 transactions * 2 records each
1192
1193        wal.shutdown().unwrap();
1194    }
1195
1196    #[test]
1197    fn test_wal_size_limit() {
1198        let tmp = TempDir::new().unwrap();
1199        let wal_path = tmp.path().join("test_limited.wal");
1200
1201        // Create WAL with a small size limit (10KB)
1202        let max_size = Some(10 * 1024);
1203        let mut wal =
1204            Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), max_size).unwrap();
1205
1206        // Write records until we hit the limit
1207        let mut count = 0;
1208        loop {
1209            let txn_id = TransactionId(count + 1);
1210            match wal.log_begin(txn_id) {
1211                Ok(_) => {
1212                    count += 1;
1213                    // Safety check
1214                    if count > 1000 {
1215                        panic!("Should have hit WAL limit by now");
1216                    }
1217                }
1218                Err(Error::WalLimitExceeded { .. }) => {
1219                    // Expected error
1220                    break;
1221                }
1222                Err(e) => {
1223                    panic!("Unexpected error: {}", e);
1224                }
1225            }
1226        }
1227
1228        assert!(count > 0, "Should have written at least some records");
1229        assert!(count < 1000, "Should have hit limit before 1000 records");
1230
1231        wal.shutdown().unwrap();
1232    }
1233
1234    #[test]
1235    fn test_wal_unlimited() {
1236        let tmp = TempDir::new().unwrap();
1237        let wal_path = tmp.path().join("test_unlimited.wal");
1238
1239        // Create WAL with no size limit
1240        let mut wal =
1241            Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), None).unwrap();
1242
1243        // Write many records - should not hit limit
1244        for i in 1..=100 {
1245            let txn_id = TransactionId(i);
1246            let lsn1 = wal.log_begin(txn_id).unwrap();
1247            wal.log_commit(txn_id, lsn1).unwrap();
1248        }
1249
1250        wal.shutdown().unwrap();
1251    }
1252
1253    #[test]
1254    fn test_wal_truncate_resets_size() {
1255        let tmp = TempDir::new().unwrap();
1256        let wal_path = tmp.path().join("test_truncate.wal");
1257
1258        let max_size = Some(100 * 1024); // 100KB
1259        let mut wal =
1260            Wal::open_with_config(&wal_path, WalGroupCommitConfig::default(), max_size).unwrap();
1261
1262        // Write some records
1263        for i in 1..=10 {
1264            let txn_id = TransactionId(i);
1265            let lsn1 = wal.log_begin(txn_id).unwrap();
1266            wal.log_commit(txn_id, lsn1).unwrap();
1267        }
1268
1269        let size_before = wal.current_size.load(Ordering::Relaxed);
1270        assert!(size_before > 0);
1271
1272        // Truncate
1273        wal.truncate().unwrap();
1274
1275        let size_after = wal.current_size.load(Ordering::Relaxed);
1276        assert_eq!(size_after, 0);
1277
1278        // Should be able to write again after truncate
1279        let lsn1 = wal.log_begin(TransactionId(100)).unwrap();
1280        wal.log_commit(TransactionId(100), lsn1).unwrap();
1281
1282        wal.shutdown().unwrap();
1283    }
1284}