Skip to main content

reddb_server/storage/transaction/
log.rs

1//! Write-Ahead Log (WAL) for Transaction Durability
2//!
3//! Provides crash recovery through sequential logging.
4
5use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// Transaction ID type
14pub type TxnId = u64;
15
16/// Log Sequence Number
17pub type Lsn = u64;
18
19/// Timestamp type
20pub type Timestamp = u64;
21
22fn read_bytes<'a>(
23    data: &'a [u8],
24    offset: &mut usize,
25    len: usize,
26    context: &'static str,
27) -> io::Result<&'a [u8]> {
28    let end = offset.saturating_add(len);
29    if end > data.len() {
30        return Err(io::Error::new(io::ErrorKind::UnexpectedEof, context));
31    }
32    let bytes = &data[*offset..end];
33    *offset = end;
34    Ok(bytes)
35}
36
37fn read_array<const N: usize>(
38    data: &[u8],
39    offset: &mut usize,
40    context: &'static str,
41) -> io::Result<[u8; N]> {
42    let bytes = read_bytes(data, offset, N, context)?;
43    let mut array = [0u8; N];
44    array.copy_from_slice(bytes);
45    Ok(array)
46}
47
48fn read_u32(data: &[u8], offset: &mut usize, context: &'static str) -> io::Result<u32> {
49    Ok(u32::from_le_bytes(read_array::<4>(data, offset, context)?))
50}
51
52fn read_u64(data: &[u8], offset: &mut usize, context: &'static str) -> io::Result<u64> {
53    Ok(u64::from_le_bytes(read_array::<8>(data, offset, context)?))
54}
55
56fn io_lock_error(context: &'static str) -> io::Error {
57    io::Error::other(format!("{context} lock poisoned"))
58}
59
60fn io_read_guard<'a, T>(
61    lock: &'a RwLock<T>,
62    context: &'static str,
63) -> io::Result<RwLockReadGuard<'a, T>> {
64    lock.read().map_err(|_| io_lock_error(context))
65}
66
67fn io_write_guard<'a, T>(
68    lock: &'a RwLock<T>,
69    context: &'static str,
70) -> io::Result<RwLockWriteGuard<'a, T>> {
71    lock.write().map_err(|_| io_lock_error(context))
72}
73
74fn io_mutex_guard<'a, T>(
75    lock: &'a Mutex<T>,
76    context: &'static str,
77) -> io::Result<MutexGuard<'a, T>> {
78    lock.lock().map_err(|_| io_lock_error(context))
79}
80
81fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
82    match lock.read() {
83        Ok(guard) => guard,
84        Err(poisoned) => poisoned.into_inner(),
85    }
86}
87
88/// Log entry types
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum LogEntryType {
91    /// Transaction begin
92    Begin,
93    /// Transaction commit
94    Commit,
95    /// Transaction abort
96    Abort,
97    /// Data insert
98    Insert { key: Vec<u8>, value: Vec<u8> },
99    /// Data update (before and after images)
100    Update {
101        key: Vec<u8>,
102        old_value: Vec<u8>,
103        new_value: Vec<u8>,
104    },
105    /// Data delete
106    Delete { key: Vec<u8>, old_value: Vec<u8> },
107    /// Checkpoint marker
108    Checkpoint { active_txns: Vec<TxnId> },
109    /// Savepoint creation
110    Savepoint { name: String },
111    /// Savepoint rollback
112    RollbackToSavepoint { name: String },
113    /// Compensation log record (for undo)
114    Compensate { original_lsn: Lsn },
115    /// End of transaction (after all cleanup)
116    End,
117}
118
119impl LogEntryType {
120    /// Check if this is a commit record
121    pub fn is_commit(&self) -> bool {
122        matches!(self, LogEntryType::Commit)
123    }
124
125    /// Check if this is an abort record
126    pub fn is_abort(&self) -> bool {
127        matches!(self, LogEntryType::Abort)
128    }
129
130    /// Check if this is a data modification
131    pub fn is_data_modification(&self) -> bool {
132        matches!(
133            self,
134            LogEntryType::Insert { .. } | LogEntryType::Update { .. } | LogEntryType::Delete { .. }
135        )
136    }
137
138    /// Serialize to bytes
139    pub fn to_bytes(&self) -> Vec<u8> {
140        let mut buf = Vec::new();
141
142        match self {
143            LogEntryType::Begin => buf.push(0),
144            LogEntryType::Commit => buf.push(1),
145            LogEntryType::Abort => buf.push(2),
146            LogEntryType::Insert { key, value } => {
147                buf.push(3);
148                buf.extend(&(key.len() as u32).to_le_bytes());
149                buf.extend(key);
150                buf.extend(&(value.len() as u32).to_le_bytes());
151                buf.extend(value);
152            }
153            LogEntryType::Update {
154                key,
155                old_value,
156                new_value,
157            } => {
158                buf.push(4);
159                buf.extend(&(key.len() as u32).to_le_bytes());
160                buf.extend(key);
161                buf.extend(&(old_value.len() as u32).to_le_bytes());
162                buf.extend(old_value);
163                buf.extend(&(new_value.len() as u32).to_le_bytes());
164                buf.extend(new_value);
165            }
166            LogEntryType::Delete { key, old_value } => {
167                buf.push(5);
168                buf.extend(&(key.len() as u32).to_le_bytes());
169                buf.extend(key);
170                buf.extend(&(old_value.len() as u32).to_le_bytes());
171                buf.extend(old_value);
172            }
173            LogEntryType::Checkpoint { active_txns } => {
174                buf.push(6);
175                buf.extend(&(active_txns.len() as u32).to_le_bytes());
176                for txn in active_txns {
177                    buf.extend(&txn.to_le_bytes());
178                }
179            }
180            LogEntryType::Savepoint { name } => {
181                buf.push(7);
182                let name_bytes = name.as_bytes();
183                buf.extend(&(name_bytes.len() as u32).to_le_bytes());
184                buf.extend(name_bytes);
185            }
186            LogEntryType::RollbackToSavepoint { name } => {
187                buf.push(8);
188                let name_bytes = name.as_bytes();
189                buf.extend(&(name_bytes.len() as u32).to_le_bytes());
190                buf.extend(name_bytes);
191            }
192            LogEntryType::Compensate { original_lsn } => {
193                buf.push(9);
194                buf.extend(&original_lsn.to_le_bytes());
195            }
196            LogEntryType::End => buf.push(10),
197        }
198
199        buf
200    }
201
202    /// Deserialize from bytes
203    pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
204        if data.is_empty() {
205            return Err(io::Error::new(io::ErrorKind::InvalidData, "Empty data"));
206        }
207
208        let mut offset = 0;
209        let tag = read_bytes(data, &mut offset, 1, "Missing log entry tag")?[0];
210
211        let entry = match tag {
212            0 => LogEntryType::Begin,
213            1 => LogEntryType::Commit,
214            2 => LogEntryType::Abort,
215            3 => {
216                // Insert
217                let key_len =
218                    read_u32(data, &mut offset, "Missing WAL insert key length")? as usize;
219                let key =
220                    read_bytes(data, &mut offset, key_len, "Truncated WAL insert key")?.to_vec();
221                let value_len =
222                    read_u32(data, &mut offset, "Missing WAL insert value length")? as usize;
223                let value = read_bytes(data, &mut offset, value_len, "Truncated WAL insert value")?
224                    .to_vec();
225                LogEntryType::Insert { key, value }
226            }
227            4 => {
228                // Update
229                let key_len =
230                    read_u32(data, &mut offset, "Missing WAL update key length")? as usize;
231                let key =
232                    read_bytes(data, &mut offset, key_len, "Truncated WAL update key")?.to_vec();
233                let old_len =
234                    read_u32(data, &mut offset, "Missing WAL update old value length")? as usize;
235                let old_value =
236                    read_bytes(data, &mut offset, old_len, "Truncated WAL update old value")?
237                        .to_vec();
238                let new_len =
239                    read_u32(data, &mut offset, "Missing WAL update new value length")? as usize;
240                let new_value =
241                    read_bytes(data, &mut offset, new_len, "Truncated WAL update new value")?
242                        .to_vec();
243                LogEntryType::Update {
244                    key,
245                    old_value,
246                    new_value,
247                }
248            }
249            5 => {
250                // Delete
251                let key_len =
252                    read_u32(data, &mut offset, "Missing WAL delete key length")? as usize;
253                let key =
254                    read_bytes(data, &mut offset, key_len, "Truncated WAL delete key")?.to_vec();
255                let old_len =
256                    read_u32(data, &mut offset, "Missing WAL delete old value length")? as usize;
257                let old_value =
258                    read_bytes(data, &mut offset, old_len, "Truncated WAL delete old value")?
259                        .to_vec();
260                LogEntryType::Delete { key, old_value }
261            }
262            6 => {
263                // Checkpoint
264                let count =
265                    read_u32(data, &mut offset, "Missing WAL checkpoint txn count")? as usize;
266                let mut active_txns = Vec::with_capacity(count);
267                for _ in 0..count {
268                    let txn =
269                        read_u64(data, &mut offset, "Truncated WAL checkpoint transaction id")?;
270                    active_txns.push(txn);
271                }
272                LogEntryType::Checkpoint { active_txns }
273            }
274            7 => {
275                // Savepoint
276                let name_len =
277                    read_u32(data, &mut offset, "Missing WAL savepoint name length")? as usize;
278                let name = String::from_utf8_lossy(read_bytes(
279                    data,
280                    &mut offset,
281                    name_len,
282                    "Truncated WAL savepoint name",
283                )?)
284                .to_string();
285                LogEntryType::Savepoint { name }
286            }
287            8 => {
288                // RollbackToSavepoint
289                let name_len = read_u32(
290                    data,
291                    &mut offset,
292                    "Missing WAL rollback-to-savepoint name length",
293                )? as usize;
294                let name = String::from_utf8_lossy(read_bytes(
295                    data,
296                    &mut offset,
297                    name_len,
298                    "Truncated WAL rollback-to-savepoint name",
299                )?)
300                .to_string();
301                LogEntryType::RollbackToSavepoint { name }
302            }
303            9 => {
304                // Compensate
305                let original_lsn =
306                    read_u64(data, &mut offset, "Truncated WAL compensate original LSN")?;
307                LogEntryType::Compensate { original_lsn }
308            }
309            10 => LogEntryType::End,
310            _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid tag")),
311        };
312
313        Ok((entry, offset))
314    }
315}
316
317/// A single log entry
318#[derive(Debug, Clone)]
319pub struct LogEntry {
320    /// Log sequence number
321    pub lsn: Lsn,
322    /// Transaction ID
323    pub txn_id: TxnId,
324    /// Previous LSN for this transaction (for undo chain)
325    pub prev_lsn: Option<Lsn>,
326    /// Timestamp
327    pub timestamp: Timestamp,
328    /// Entry type
329    pub entry_type: LogEntryType,
330}
331
332impl LogEntry {
333    /// Create new log entry
334    pub fn new(txn_id: TxnId, prev_lsn: Option<Lsn>, entry_type: LogEntryType) -> Self {
335        Self {
336            lsn: 0, // Will be assigned by log
337            txn_id,
338            prev_lsn,
339            timestamp: SystemTime::now()
340                .duration_since(UNIX_EPOCH)
341                .unwrap_or_default()
342                .as_micros() as Timestamp,
343            entry_type,
344        }
345    }
346
347    /// Serialize to bytes
348    pub fn to_bytes(&self) -> Vec<u8> {
349        let mut buf = Vec::new();
350
351        // Header: lsn(8) + txn_id(8) + prev_lsn(8) + timestamp(8) = 32 bytes
352        buf.extend(&self.lsn.to_le_bytes());
353        buf.extend(&self.txn_id.to_le_bytes());
354        buf.extend(&self.prev_lsn.unwrap_or(0).to_le_bytes());
355        buf.extend(&self.timestamp.to_le_bytes());
356
357        // Entry type
358        let type_bytes = self.entry_type.to_bytes();
359        buf.extend(&(type_bytes.len() as u32).to_le_bytes());
360        buf.extend(&type_bytes);
361
362        // Checksum (simple XOR for demo)
363        let checksum: u8 = buf.iter().fold(0, |acc, &b| acc ^ b);
364        buf.push(checksum);
365
366        buf
367    }
368
369    /// Deserialize from bytes
370    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
371        if data.len() < 37 {
372            // 32 header + 4 type_len + 1 checksum minimum
373            return Err(io::Error::new(io::ErrorKind::InvalidData, "Too short"));
374        }
375
376        let mut offset = 0;
377        let lsn = read_u64(data, &mut offset, "Missing WAL entry LSN")?;
378        let txn_id = read_u64(data, &mut offset, "Missing WAL entry txn id")?;
379        let prev_lsn_raw = read_u64(data, &mut offset, "Missing WAL entry prev_lsn")?;
380        let prev_lsn = if prev_lsn_raw == 0 {
381            None
382        } else {
383            Some(prev_lsn_raw)
384        };
385        let timestamp = read_u64(data, &mut offset, "Missing WAL entry timestamp")?;
386        let type_len = read_u32(data, &mut offset, "Missing WAL entry type length")? as usize;
387        let entry_type_bytes = read_bytes(
388            data,
389            &mut offset,
390            type_len,
391            "Truncated WAL entry type bytes",
392        )?;
393        let (entry_type, consumed) = LogEntryType::from_bytes(entry_type_bytes)?;
394        if consumed != entry_type_bytes.len() {
395            return Err(io::Error::new(
396                io::ErrorKind::InvalidData,
397                "WAL entry type length mismatch",
398            ));
399        }
400
401        // Verify checksum
402        let stored_checksum = *data.get(offset).ok_or_else(|| {
403            io::Error::new(io::ErrorKind::UnexpectedEof, "Missing WAL entry checksum")
404        })?;
405        let computed: u8 = data[..offset].iter().fold(0, |acc, &b| acc ^ b);
406        if stored_checksum != computed {
407            return Err(io::Error::new(
408                io::ErrorKind::InvalidData,
409                "Checksum mismatch",
410            ));
411        }
412
413        Ok(Self {
414            lsn,
415            txn_id,
416            prev_lsn,
417            timestamp,
418            entry_type,
419        })
420    }
421
422    /// Get the size of this entry when serialized
423    pub fn serialized_size(&self) -> usize {
424        32 + 4 + self.entry_type.to_bytes().len() + 1
425    }
426}
427
428/// WAL configuration
429#[derive(Debug, Clone)]
430pub struct WalConfig {
431    /// Log file path
432    pub path: PathBuf,
433    /// Sync mode (fsync after each write)
434    pub sync_on_commit: bool,
435    /// Buffer size
436    pub buffer_size: usize,
437    /// Maximum log file size before rotation
438    pub max_file_size: u64,
439    /// Checkpoint interval (in entries)
440    pub checkpoint_interval: u64,
441}
442
443impl Default for WalConfig {
444    fn default() -> Self {
445        Self {
446            path: PathBuf::from("wal.log"),
447            sync_on_commit: true,
448            buffer_size: 64 * 1024,           // 64KB
449            max_file_size: 100 * 1024 * 1024, // 100MB
450            checkpoint_interval: 1000,
451        }
452    }
453}
454
455impl WalConfig {
456    /// Create config with path
457    pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
458        Self {
459            path: path.as_ref().to_path_buf(),
460            ..Default::default()
461        }
462    }
463}
464
465/// WAL statistics
466#[derive(Debug, Clone, Default)]
467pub struct WalStats {
468    /// Total entries written
469    pub entries_written: u64,
470    /// Total bytes written
471    pub bytes_written: u64,
472    /// Total syncs
473    pub syncs: u64,
474    /// Checkpoints performed
475    pub checkpoints: u64,
476    /// Current file size
477    pub file_size: u64,
478}
479
480/// Transaction Log (WAL)
481pub struct TransactionLog {
482    /// Configuration
483    config: WalConfig,
484    /// Next LSN to assign
485    next_lsn: AtomicU64,
486    /// Log file (optional for in-memory mode)
487    file: Option<Mutex<BufWriter<File>>>,
488    /// In-memory buffer for entries
489    buffer: RwLock<VecDeque<LogEntry>>,
490    /// Transaction prev_lsn tracking
491    txn_prev_lsn: RwLock<std::collections::HashMap<TxnId, Lsn>>,
492    /// Statistics
493    stats: RwLock<WalStats>,
494    /// Last checkpoint LSN
495    last_checkpoint_lsn: AtomicU64,
496}
497
498impl TransactionLog {
499    /// Create new transaction log
500    pub fn new(config: WalConfig) -> io::Result<Self> {
501        let file = if config.path.as_os_str().is_empty() {
502            None
503        } else {
504            let f = OpenOptions::new()
505                .create(true)
506                .append(true)
507                .read(true)
508                .open(&config.path)?;
509            Some(Mutex::new(BufWriter::with_capacity(config.buffer_size, f)))
510        };
511
512        Ok(Self {
513            config,
514            next_lsn: AtomicU64::new(1),
515            file,
516            buffer: RwLock::new(VecDeque::new()),
517            txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
518            stats: RwLock::new(WalStats::default()),
519            last_checkpoint_lsn: AtomicU64::new(0),
520        })
521    }
522
523    /// Create in-memory log (no persistence)
524    pub fn in_memory() -> Self {
525        Self {
526            config: WalConfig {
527                path: PathBuf::new(),
528                ..Default::default()
529            },
530            next_lsn: AtomicU64::new(1),
531            file: None,
532            buffer: RwLock::new(VecDeque::new()),
533            txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
534            stats: RwLock::new(WalStats::default()),
535            last_checkpoint_lsn: AtomicU64::new(0),
536        }
537    }
538
539    /// Append entry to log
540    pub fn append(&self, mut entry: LogEntry) -> io::Result<Lsn> {
541        // Assign LSN
542        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
543        entry.lsn = lsn;
544
545        // Update prev_lsn tracking
546        {
547            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
548            entry.prev_lsn = prev_lsns.get(&entry.txn_id).copied();
549            prev_lsns.insert(entry.txn_id, lsn);
550        }
551
552        let bytes = entry.to_bytes();
553
554        // Write to file if available
555        if let Some(ref file) = self.file {
556            let mut writer = io_mutex_guard(file, "wal file")?;
557            // Write length prefix
558            writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
559            writer.write_all(&bytes)?;
560
561            // Sync on commit if configured
562            if self.config.sync_on_commit && entry.entry_type.is_commit() {
563                writer.flush()?;
564                writer.get_mut().sync_all()?;
565
566                let mut stats = io_write_guard(&self.stats, "wal stats")?;
567                stats.syncs += 1;
568            }
569        }
570
571        // Store in buffer
572        {
573            let mut buffer = io_write_guard(&self.buffer, "wal buffer")?;
574            buffer.push_back(entry);
575
576            // Limit buffer size
577            while buffer.len() > 10000 {
578                buffer.pop_front();
579            }
580        }
581
582        // Update stats
583        {
584            let mut stats = io_write_guard(&self.stats, "wal stats")?;
585            stats.entries_written += 1;
586            stats.bytes_written += bytes.len() as u64 + 4;
587            stats.file_size += bytes.len() as u64 + 4;
588        }
589
590        Ok(lsn)
591    }
592
593    /// Log transaction begin
594    pub fn log_begin(&self, txn_id: TxnId) -> io::Result<Lsn> {
595        self.append(LogEntry::new(txn_id, None, LogEntryType::Begin))
596    }
597
598    /// Log transaction commit
599    pub fn log_commit(&self, txn_id: TxnId) -> io::Result<Lsn> {
600        let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Commit))?;
601
602        // Clean up prev_lsn tracking
603        {
604            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
605            prev_lsns.remove(&txn_id);
606        }
607
608        Ok(lsn)
609    }
610
611    /// Log transaction abort
612    pub fn log_abort(&self, txn_id: TxnId) -> io::Result<Lsn> {
613        let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Abort))?;
614
615        // Clean up prev_lsn tracking
616        {
617            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
618            prev_lsns.remove(&txn_id);
619        }
620
621        Ok(lsn)
622    }
623
624    /// Log insert operation
625    pub fn log_insert(&self, txn_id: TxnId, key: Vec<u8>, value: Vec<u8>) -> io::Result<Lsn> {
626        self.append(LogEntry::new(
627            txn_id,
628            None,
629            LogEntryType::Insert { key, value },
630        ))
631    }
632
633    /// Log update operation
634    pub fn log_update(
635        &self,
636        txn_id: TxnId,
637        key: Vec<u8>,
638        old_value: Vec<u8>,
639        new_value: Vec<u8>,
640    ) -> io::Result<Lsn> {
641        self.append(LogEntry::new(
642            txn_id,
643            None,
644            LogEntryType::Update {
645                key,
646                old_value,
647                new_value,
648            },
649        ))
650    }
651
652    /// Log delete operation
653    pub fn log_delete(&self, txn_id: TxnId, key: Vec<u8>, old_value: Vec<u8>) -> io::Result<Lsn> {
654        self.append(LogEntry::new(
655            txn_id,
656            None,
657            LogEntryType::Delete { key, old_value },
658        ))
659    }
660
661    /// Log savepoint
662    pub fn log_savepoint(&self, txn_id: TxnId, name: String) -> io::Result<Lsn> {
663        self.append(LogEntry::new(
664            txn_id,
665            None,
666            LogEntryType::Savepoint { name },
667        ))
668    }
669
670    /// Write checkpoint
671    pub fn checkpoint(&self, active_txns: Vec<TxnId>) -> io::Result<Lsn> {
672        let lsn = self.append(LogEntry::new(
673            0, // System transaction
674            None,
675            LogEntryType::Checkpoint { active_txns },
676        ))?;
677
678        // Force sync
679        if let Some(ref file) = self.file {
680            let mut writer = io_mutex_guard(file, "wal file")?;
681            writer.flush()?;
682            writer.get_mut().sync_all()?;
683        }
684
685        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
686
687        {
688            let mut stats = io_write_guard(&self.stats, "wal stats")?;
689            stats.checkpoints += 1;
690        }
691
692        Ok(lsn)
693    }
694
695    /// Flush buffer to disk
696    pub fn flush(&self) -> io::Result<()> {
697        if let Some(ref file) = self.file {
698            let mut writer = io_mutex_guard(file, "wal file")?;
699            writer.flush()?;
700            writer.get_mut().sync_all()?;
701        }
702        Ok(())
703    }
704
705    /// Get entries for a transaction (for undo)
706    pub fn get_txn_entries(&self, txn_id: TxnId) -> Vec<LogEntry> {
707        let buffer = recover_read_guard(&self.buffer);
708        buffer
709            .iter()
710            .filter(|e| e.txn_id == txn_id)
711            .cloned()
712            .collect()
713    }
714
715    /// Get entries since LSN
716    pub fn get_entries_since(&self, lsn: Lsn) -> Vec<LogEntry> {
717        let buffer = recover_read_guard(&self.buffer);
718        buffer.iter().filter(|e| e.lsn >= lsn).cloned().collect()
719    }
720
721    /// Get current LSN
722    pub fn current_lsn(&self) -> Lsn {
723        self.next_lsn.load(Ordering::SeqCst) - 1
724    }
725
726    /// Get last checkpoint LSN
727    pub fn last_checkpoint(&self) -> Lsn {
728        self.last_checkpoint_lsn.load(Ordering::SeqCst)
729    }
730
731    /// Get statistics
732    pub fn stats(&self) -> WalStats {
733        recover_read_guard(&self.stats).clone()
734    }
735
736    /// Get configuration
737    pub fn config(&self) -> &WalConfig {
738        &self.config
739    }
740}
741
742/// Log reader for recovery
743pub struct LogReader {
744    reader: BufReader<File>,
745}
746
747impl LogReader {
748    /// Open log file for reading
749    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
750        let file = File::open(path)?;
751        Ok(Self {
752            reader: BufReader::new(file),
753        })
754    }
755
756    /// Read all entries
757    pub fn read_all(&mut self) -> io::Result<Vec<LogEntry>> {
758        let mut entries = Vec::new();
759
760        loop {
761            match self.read_entry() {
762                Ok(entry) => entries.push(entry),
763                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
764                Err(e) => return Err(e),
765            }
766        }
767
768        Ok(entries)
769    }
770
771    /// Read single entry
772    pub fn read_entry(&mut self) -> io::Result<LogEntry> {
773        let mut len_buf = [0u8; 4];
774        self.reader.read_exact(&mut len_buf)?;
775        let len = u32::from_le_bytes(len_buf) as usize;
776
777        let mut data = vec![0u8; len];
778        self.reader.read_exact(&mut data)?;
779
780        LogEntry::from_bytes(&data)
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787
788    #[test]
789    fn test_log_entry_serialize() {
790        let entry = LogEntry {
791            lsn: 42,
792            txn_id: 1,
793            prev_lsn: Some(40),
794            timestamp: 1234567890,
795            entry_type: LogEntryType::Insert {
796                key: b"key1".to_vec(),
797                value: b"value1".to_vec(),
798            },
799        };
800
801        let bytes = entry.to_bytes();
802        let recovered = LogEntry::from_bytes(&bytes).unwrap();
803
804        assert_eq!(recovered.lsn, entry.lsn);
805        assert_eq!(recovered.txn_id, entry.txn_id);
806        assert_eq!(recovered.prev_lsn, entry.prev_lsn);
807    }
808
809    #[test]
810    fn test_in_memory_log() {
811        let log = TransactionLog::in_memory();
812
813        let lsn1 = log.log_begin(1).unwrap();
814        let lsn2 = log
815            .log_insert(1, b"key".to_vec(), b"value".to_vec())
816            .unwrap();
817        let lsn3 = log.log_commit(1).unwrap();
818
819        assert_eq!(lsn1, 1);
820        assert_eq!(lsn2, 2);
821        assert_eq!(lsn3, 3);
822
823        let entries = log.get_txn_entries(1);
824        assert_eq!(entries.len(), 3);
825    }
826
827    #[test]
828    fn test_checkpoint() {
829        let log = TransactionLog::in_memory();
830
831        log.log_begin(1).unwrap();
832        log.log_begin(2).unwrap();
833
834        let cp_lsn = log.checkpoint(vec![1, 2]).unwrap();
835        assert_eq!(log.last_checkpoint(), cp_lsn);
836    }
837
838    #[test]
839    fn test_log_entry_types() {
840        let types = vec![
841            LogEntryType::Begin,
842            LogEntryType::Commit,
843            LogEntryType::Abort,
844            LogEntryType::Insert {
845                key: b"k".to_vec(),
846                value: b"v".to_vec(),
847            },
848            LogEntryType::Update {
849                key: b"k".to_vec(),
850                old_value: b"old".to_vec(),
851                new_value: b"new".to_vec(),
852            },
853            LogEntryType::Delete {
854                key: b"k".to_vec(),
855                old_value: b"v".to_vec(),
856            },
857            LogEntryType::Checkpoint {
858                active_txns: vec![1, 2, 3],
859            },
860            LogEntryType::Savepoint {
861                name: "sp1".to_string(),
862            },
863            LogEntryType::End,
864        ];
865
866        for t in types {
867            let bytes = t.to_bytes();
868            let (recovered, _) = LogEntryType::from_bytes(&bytes).unwrap();
869            assert_eq!(recovered, t);
870        }
871    }
872
873    #[test]
874    fn test_prev_lsn_chain() {
875        let log = TransactionLog::in_memory();
876
877        log.log_begin(1).unwrap(); // LSN 1, prev_lsn = None
878        log.log_insert(1, b"k1".to_vec(), b"v1".to_vec()).unwrap(); // LSN 2, prev_lsn = 1
879        log.log_insert(1, b"k2".to_vec(), b"v2".to_vec()).unwrap(); // LSN 3, prev_lsn = 2
880
881        let entries = log.get_txn_entries(1);
882        assert_eq!(entries[0].prev_lsn, None);
883        assert_eq!(entries[1].prev_lsn, Some(1));
884        assert_eq!(entries[2].prev_lsn, Some(2));
885    }
886
887    #[test]
888    fn test_log_entry_type_rejects_truncated_insert() {
889        let err = LogEntryType::from_bytes(&[3, 4, 0, 0, 0, b'k'])
890            .expect_err("truncated insert should fail");
891        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
892    }
893
894    #[test]
895    fn test_log_entry_rejects_truncated_type_payload() {
896        let entry = LogEntry {
897            lsn: 7,
898            txn_id: 9,
899            prev_lsn: Some(3),
900            timestamp: 42,
901            entry_type: LogEntryType::Insert {
902                key: b"hello".to_vec(),
903                value: b"world".to_vec(),
904            },
905        };
906
907        let mut bytes = entry.to_bytes();
908        bytes.truncate(bytes.len() - 2);
909
910        let err = LogEntry::from_bytes(&bytes).expect_err("truncated entry should fail");
911        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
912    }
913}