Skip to main content

reddb_server/storage/transaction/
log.rs

1//! Write-Ahead Log (WAL) for Transaction Durability
2//!
3//! Provides crash recovery through sequential logging.
4
5use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// Transaction ID type
14pub type TxnId = u64;
15
16/// Log Sequence Number
17pub type Lsn = u64;
18
19/// Timestamp type
20pub type Timestamp = u64;
21
22fn io_lock_error(context: &'static str) -> io::Error {
23    io::Error::other(format!("{context} lock poisoned"))
24}
25
26fn io_read_guard<'a, T>(
27    lock: &'a RwLock<T>,
28    context: &'static str,
29) -> io::Result<RwLockReadGuard<'a, T>> {
30    lock.read().map_err(|_| io_lock_error(context))
31}
32
33fn io_write_guard<'a, T>(
34    lock: &'a RwLock<T>,
35    context: &'static str,
36) -> io::Result<RwLockWriteGuard<'a, T>> {
37    lock.write().map_err(|_| io_lock_error(context))
38}
39
40fn io_mutex_guard<'a, T>(
41    lock: &'a Mutex<T>,
42    context: &'static str,
43) -> io::Result<MutexGuard<'a, T>> {
44    lock.lock().map_err(|_| io_lock_error(context))
45}
46
47fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
48    match lock.read() {
49        Ok(guard) => guard,
50        Err(poisoned) => poisoned.into_inner(),
51    }
52}
53
54fn transaction_wal_frame_error(err: reddb_file::RdbFileError) -> io::Error {
55    let message = err.to_string();
56    let kind = if message.contains("missing")
57        || message.contains("empty")
58        || message.contains("truncated")
59        || message.contains("too short")
60    {
61        io::ErrorKind::UnexpectedEof
62    } else {
63        io::ErrorKind::InvalidData
64    };
65    io::Error::new(kind, message)
66}
67
68fn transaction_wal_payload_error(err: reddb_file::RdbFileError) -> io::Error {
69    transaction_wal_frame_error(err)
70}
71
72/// Log entry types
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum LogEntryType {
75    /// Transaction begin
76    Begin,
77    /// Transaction commit
78    Commit,
79    /// Transaction abort
80    Abort,
81    /// Data insert
82    Insert { key: Vec<u8>, value: Vec<u8> },
83    /// Data update (before and after images)
84    Update {
85        key: Vec<u8>,
86        old_value: Vec<u8>,
87        new_value: Vec<u8>,
88    },
89    /// Data delete
90    Delete { key: Vec<u8>, old_value: Vec<u8> },
91    /// Checkpoint marker
92    Checkpoint { active_txns: Vec<TxnId> },
93    /// Savepoint creation
94    Savepoint { name: String },
95    /// Savepoint rollback
96    RollbackToSavepoint { name: String },
97    /// Compensation log record (for undo)
98    Compensate { original_lsn: Lsn },
99    /// End of transaction (after all cleanup)
100    End,
101}
102
103impl LogEntryType {
104    /// Check if this is a commit record
105    pub fn is_commit(&self) -> bool {
106        matches!(self, LogEntryType::Commit)
107    }
108
109    /// Check if this is an abort record
110    pub fn is_abort(&self) -> bool {
111        matches!(self, LogEntryType::Abort)
112    }
113
114    /// Check if this is a data modification
115    pub fn is_data_modification(&self) -> bool {
116        matches!(
117            self,
118            LogEntryType::Insert { .. } | LogEntryType::Update { .. } | LogEntryType::Delete { .. }
119        )
120    }
121
122    /// Serialize to bytes
123    pub fn to_bytes(&self) -> Vec<u8> {
124        reddb_file::encode_transaction_wal_entry_payload(&self.to_file_payload())
125    }
126
127    /// Deserialize from bytes
128    pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
129        let (payload, consumed) = reddb_file::decode_transaction_wal_entry_payload(data)
130            .map_err(transaction_wal_payload_error)?;
131        Ok((Self::from_file_payload(payload), consumed))
132    }
133
134    fn to_file_payload(&self) -> reddb_file::TransactionWalEntryPayload {
135        match self {
136            LogEntryType::Begin => reddb_file::TransactionWalEntryPayload::Begin,
137            LogEntryType::Commit => reddb_file::TransactionWalEntryPayload::Commit,
138            LogEntryType::Abort => reddb_file::TransactionWalEntryPayload::Abort,
139            LogEntryType::Insert { key, value } => reddb_file::TransactionWalEntryPayload::Insert {
140                key: key.clone(),
141                value: value.clone(),
142            },
143            LogEntryType::Update {
144                key,
145                old_value,
146                new_value,
147            } => reddb_file::TransactionWalEntryPayload::Update {
148                key: key.clone(),
149                old_value: old_value.clone(),
150                new_value: new_value.clone(),
151            },
152            LogEntryType::Delete { key, old_value } => {
153                reddb_file::TransactionWalEntryPayload::Delete {
154                    key: key.clone(),
155                    old_value: old_value.clone(),
156                }
157            }
158            LogEntryType::Checkpoint { active_txns } => {
159                reddb_file::TransactionWalEntryPayload::Checkpoint {
160                    active_txns: active_txns.clone(),
161                }
162            }
163            LogEntryType::Savepoint { name } => {
164                reddb_file::TransactionWalEntryPayload::Savepoint { name: name.clone() }
165            }
166            LogEntryType::RollbackToSavepoint { name } => {
167                reddb_file::TransactionWalEntryPayload::RollbackToSavepoint { name: name.clone() }
168            }
169            LogEntryType::Compensate { original_lsn } => {
170                reddb_file::TransactionWalEntryPayload::Compensate {
171                    original_lsn: *original_lsn,
172                }
173            }
174            LogEntryType::End => reddb_file::TransactionWalEntryPayload::End,
175        }
176    }
177
178    fn from_file_payload(payload: reddb_file::TransactionWalEntryPayload) -> Self {
179        match payload {
180            reddb_file::TransactionWalEntryPayload::Begin => LogEntryType::Begin,
181            reddb_file::TransactionWalEntryPayload::Commit => LogEntryType::Commit,
182            reddb_file::TransactionWalEntryPayload::Abort => LogEntryType::Abort,
183            reddb_file::TransactionWalEntryPayload::Insert { key, value } => {
184                LogEntryType::Insert { key, value }
185            }
186            reddb_file::TransactionWalEntryPayload::Update {
187                key,
188                old_value,
189                new_value,
190            } => LogEntryType::Update {
191                key,
192                old_value,
193                new_value,
194            },
195            reddb_file::TransactionWalEntryPayload::Delete { key, old_value } => {
196                LogEntryType::Delete { key, old_value }
197            }
198            reddb_file::TransactionWalEntryPayload::Checkpoint { active_txns } => {
199                LogEntryType::Checkpoint { active_txns }
200            }
201            reddb_file::TransactionWalEntryPayload::Savepoint { name } => {
202                LogEntryType::Savepoint { name }
203            }
204            reddb_file::TransactionWalEntryPayload::RollbackToSavepoint { name } => {
205                LogEntryType::RollbackToSavepoint { name }
206            }
207            reddb_file::TransactionWalEntryPayload::Compensate { original_lsn } => {
208                LogEntryType::Compensate { original_lsn }
209            }
210            reddb_file::TransactionWalEntryPayload::End => LogEntryType::End,
211        }
212    }
213}
214
215/// A single log entry
216#[derive(Debug, Clone)]
217pub struct LogEntry {
218    /// Log sequence number
219    pub lsn: Lsn,
220    /// Transaction ID
221    pub txn_id: TxnId,
222    /// Previous LSN for this transaction (for undo chain)
223    pub prev_lsn: Option<Lsn>,
224    /// Timestamp
225    pub timestamp: Timestamp,
226    /// Entry type
227    pub entry_type: LogEntryType,
228}
229
230impl LogEntry {
231    /// Create new log entry
232    pub fn new(txn_id: TxnId, prev_lsn: Option<Lsn>, entry_type: LogEntryType) -> Self {
233        Self {
234            lsn: 0, // Will be assigned by log
235            txn_id,
236            prev_lsn,
237            timestamp: SystemTime::now()
238                .duration_since(UNIX_EPOCH)
239                .unwrap_or_default()
240                .as_micros() as Timestamp,
241            entry_type,
242        }
243    }
244
245    /// Serialize to bytes
246    pub fn to_bytes(&self) -> Vec<u8> {
247        reddb_file::encode_transaction_wal_record_frame(&reddb_file::TransactionWalRecordFrame {
248            lsn: self.lsn,
249            txn_id: self.txn_id,
250            prev_lsn: self.prev_lsn,
251            timestamp: self.timestamp,
252            entry_type_payload: self.entry_type.to_bytes(),
253        })
254    }
255
256    /// Deserialize from bytes
257    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
258        let frame = reddb_file::decode_transaction_wal_record_frame(data)
259            .map_err(transaction_wal_frame_error)?;
260        let (entry_type, consumed) = LogEntryType::from_bytes(&frame.entry_type_payload)?;
261        if consumed != frame.entry_type_payload.len() {
262            return Err(io::Error::new(
263                io::ErrorKind::InvalidData,
264                "WAL entry type length mismatch",
265            ));
266        }
267
268        Ok(Self {
269            lsn: frame.lsn,
270            txn_id: frame.txn_id,
271            prev_lsn: frame.prev_lsn,
272            timestamp: frame.timestamp,
273            entry_type,
274        })
275    }
276
277    /// Get the size of this entry when serialized
278    pub fn serialized_size(&self) -> usize {
279        reddb_file::transaction_wal_record_encoded_len(self.entry_type.to_bytes().len())
280    }
281}
282
283/// WAL configuration
284#[derive(Debug, Clone)]
285pub struct WalConfig {
286    /// Log file path
287    pub path: PathBuf,
288    /// Sync mode (fsync after each write)
289    pub sync_on_commit: bool,
290    /// Buffer size
291    pub buffer_size: usize,
292    /// Maximum log file size before rotation
293    pub max_file_size: u64,
294    /// Checkpoint interval (in entries)
295    pub checkpoint_interval: u64,
296}
297
298impl Default for WalConfig {
299    fn default() -> Self {
300        Self {
301            path: reddb_file::layout::default_transaction_wal_path(),
302            sync_on_commit: true,
303            buffer_size: 64 * 1024,           // 64KB
304            max_file_size: 100 * 1024 * 1024, // 100MB
305            checkpoint_interval: 1000,
306        }
307    }
308}
309
310impl WalConfig {
311    /// Create config with path
312    pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
313        Self {
314            path: path.as_ref().to_path_buf(),
315            ..Default::default()
316        }
317    }
318}
319
320/// WAL statistics
321#[derive(Debug, Clone, Default)]
322pub struct WalStats {
323    /// Total entries written
324    pub entries_written: u64,
325    /// Total bytes written
326    pub bytes_written: u64,
327    /// Total syncs
328    pub syncs: u64,
329    /// Checkpoints performed
330    pub checkpoints: u64,
331    /// Current file size
332    pub file_size: u64,
333}
334
335/// Transaction Log (WAL)
336pub struct TransactionLog {
337    /// Configuration
338    config: WalConfig,
339    /// Next LSN to assign
340    next_lsn: AtomicU64,
341    /// Log file (optional for in-memory mode)
342    file: Option<Mutex<BufWriter<File>>>,
343    /// In-memory buffer for entries
344    buffer: RwLock<VecDeque<LogEntry>>,
345    /// Transaction prev_lsn tracking
346    txn_prev_lsn: RwLock<std::collections::HashMap<TxnId, Lsn>>,
347    /// Statistics
348    stats: RwLock<WalStats>,
349    /// Last checkpoint LSN
350    last_checkpoint_lsn: AtomicU64,
351}
352
353impl TransactionLog {
354    /// Create new transaction log
355    pub fn new(config: WalConfig) -> io::Result<Self> {
356        let file = if config.path.as_os_str().is_empty() {
357            None
358        } else {
359            let f = OpenOptions::new()
360                .create(true)
361                .append(true)
362                .read(true)
363                .open(&config.path)?;
364            Some(Mutex::new(BufWriter::with_capacity(config.buffer_size, f)))
365        };
366
367        Ok(Self {
368            config,
369            next_lsn: AtomicU64::new(1),
370            file,
371            buffer: RwLock::new(VecDeque::new()),
372            txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
373            stats: RwLock::new(WalStats::default()),
374            last_checkpoint_lsn: AtomicU64::new(0),
375        })
376    }
377
378    /// Create in-memory log (no persistence)
379    pub fn in_memory() -> Self {
380        Self {
381            config: WalConfig {
382                path: PathBuf::new(),
383                ..Default::default()
384            },
385            next_lsn: AtomicU64::new(1),
386            file: None,
387            buffer: RwLock::new(VecDeque::new()),
388            txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
389            stats: RwLock::new(WalStats::default()),
390            last_checkpoint_lsn: AtomicU64::new(0),
391        }
392    }
393
394    /// Append entry to log
395    pub fn append(&self, mut entry: LogEntry) -> io::Result<Lsn> {
396        // Assign LSN
397        let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
398        entry.lsn = lsn;
399
400        // Update prev_lsn tracking
401        {
402            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
403            entry.prev_lsn = prev_lsns.get(&entry.txn_id).copied();
404            prev_lsns.insert(entry.txn_id, lsn);
405        }
406
407        let bytes = entry.to_bytes();
408
409        // Write to file if available
410        if let Some(ref file) = self.file {
411            let mut writer = io_mutex_guard(file, "wal file")?;
412            // Write length prefix
413            writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
414            writer.write_all(&bytes)?;
415
416            // Sync on commit if configured
417            if self.config.sync_on_commit && entry.entry_type.is_commit() {
418                writer.flush()?;
419                writer.get_mut().sync_all()?;
420
421                let mut stats = io_write_guard(&self.stats, "wal stats")?;
422                stats.syncs += 1;
423            }
424        }
425
426        // Store in buffer
427        {
428            let mut buffer = io_write_guard(&self.buffer, "wal buffer")?;
429            buffer.push_back(entry);
430
431            // Limit buffer size
432            while buffer.len() > 10000 {
433                buffer.pop_front();
434            }
435        }
436
437        // Update stats
438        {
439            let mut stats = io_write_guard(&self.stats, "wal stats")?;
440            stats.entries_written += 1;
441            stats.bytes_written += bytes.len() as u64 + 4;
442            stats.file_size += bytes.len() as u64 + 4;
443        }
444
445        Ok(lsn)
446    }
447
448    /// Log transaction begin
449    pub fn log_begin(&self, txn_id: TxnId) -> io::Result<Lsn> {
450        self.append(LogEntry::new(txn_id, None, LogEntryType::Begin))
451    }
452
453    /// Log transaction commit
454    pub fn log_commit(&self, txn_id: TxnId) -> io::Result<Lsn> {
455        let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Commit))?;
456
457        // Clean up prev_lsn tracking
458        {
459            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
460            prev_lsns.remove(&txn_id);
461        }
462
463        Ok(lsn)
464    }
465
466    /// Log transaction abort
467    pub fn log_abort(&self, txn_id: TxnId) -> io::Result<Lsn> {
468        let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Abort))?;
469
470        // Clean up prev_lsn tracking
471        {
472            let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
473            prev_lsns.remove(&txn_id);
474        }
475
476        Ok(lsn)
477    }
478
479    /// Log insert operation
480    pub fn log_insert(&self, txn_id: TxnId, key: Vec<u8>, value: Vec<u8>) -> io::Result<Lsn> {
481        self.append(LogEntry::new(
482            txn_id,
483            None,
484            LogEntryType::Insert { key, value },
485        ))
486    }
487
488    /// Log update operation
489    pub fn log_update(
490        &self,
491        txn_id: TxnId,
492        key: Vec<u8>,
493        old_value: Vec<u8>,
494        new_value: Vec<u8>,
495    ) -> io::Result<Lsn> {
496        self.append(LogEntry::new(
497            txn_id,
498            None,
499            LogEntryType::Update {
500                key,
501                old_value,
502                new_value,
503            },
504        ))
505    }
506
507    /// Log delete operation
508    pub fn log_delete(&self, txn_id: TxnId, key: Vec<u8>, old_value: Vec<u8>) -> io::Result<Lsn> {
509        self.append(LogEntry::new(
510            txn_id,
511            None,
512            LogEntryType::Delete { key, old_value },
513        ))
514    }
515
516    /// Log savepoint
517    pub fn log_savepoint(&self, txn_id: TxnId, name: String) -> io::Result<Lsn> {
518        self.append(LogEntry::new(
519            txn_id,
520            None,
521            LogEntryType::Savepoint { name },
522        ))
523    }
524
525    /// Write checkpoint
526    pub fn checkpoint(&self, active_txns: Vec<TxnId>) -> io::Result<Lsn> {
527        let lsn = self.append(LogEntry::new(
528            0, // System transaction
529            None,
530            LogEntryType::Checkpoint { active_txns },
531        ))?;
532
533        // Force sync
534        if let Some(ref file) = self.file {
535            let mut writer = io_mutex_guard(file, "wal file")?;
536            writer.flush()?;
537            writer.get_mut().sync_all()?;
538        }
539
540        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
541
542        {
543            let mut stats = io_write_guard(&self.stats, "wal stats")?;
544            stats.checkpoints += 1;
545        }
546
547        Ok(lsn)
548    }
549
550    /// Flush buffer to disk
551    pub fn flush(&self) -> io::Result<()> {
552        if let Some(ref file) = self.file {
553            let mut writer = io_mutex_guard(file, "wal file")?;
554            writer.flush()?;
555            writer.get_mut().sync_all()?;
556        }
557        Ok(())
558    }
559
560    /// Get entries for a transaction (for undo)
561    pub fn get_txn_entries(&self, txn_id: TxnId) -> Vec<LogEntry> {
562        let buffer = recover_read_guard(&self.buffer);
563        buffer
564            .iter()
565            .filter(|e| e.txn_id == txn_id)
566            .cloned()
567            .collect()
568    }
569
570    /// Get entries since LSN
571    pub fn get_entries_since(&self, lsn: Lsn) -> Vec<LogEntry> {
572        let buffer = recover_read_guard(&self.buffer);
573        buffer.iter().filter(|e| e.lsn >= lsn).cloned().collect()
574    }
575
576    /// Get current LSN
577    pub fn current_lsn(&self) -> Lsn {
578        self.next_lsn.load(Ordering::SeqCst) - 1
579    }
580
581    /// Get last checkpoint LSN
582    pub fn last_checkpoint(&self) -> Lsn {
583        self.last_checkpoint_lsn.load(Ordering::SeqCst)
584    }
585
586    /// Get statistics
587    pub fn stats(&self) -> WalStats {
588        recover_read_guard(&self.stats).clone()
589    }
590
591    /// Get configuration
592    pub fn config(&self) -> &WalConfig {
593        &self.config
594    }
595}
596
597/// Log reader for recovery
598pub struct LogReader {
599    reader: BufReader<File>,
600}
601
602impl LogReader {
603    /// Open log file for reading
604    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
605        let file = File::open(path)?;
606        Ok(Self {
607            reader: BufReader::new(file),
608        })
609    }
610
611    /// Read all entries
612    pub fn read_all(&mut self) -> io::Result<Vec<LogEntry>> {
613        let mut entries = Vec::new();
614
615        loop {
616            match self.read_entry() {
617                Ok(entry) => entries.push(entry),
618                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
619                Err(e) => return Err(e),
620            }
621        }
622
623        Ok(entries)
624    }
625
626    /// Read single entry
627    pub fn read_entry(&mut self) -> io::Result<LogEntry> {
628        let mut len_buf = [0u8; 4];
629        self.reader.read_exact(&mut len_buf)?;
630        let len = u32::from_le_bytes(len_buf) as usize;
631
632        let mut data = vec![0u8; len];
633        self.reader.read_exact(&mut data)?;
634
635        LogEntry::from_bytes(&data)
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642
643    #[test]
644    fn test_log_entry_serialize() {
645        let entry = LogEntry {
646            lsn: 42,
647            txn_id: 1,
648            prev_lsn: Some(40),
649            timestamp: 1234567890,
650            entry_type: LogEntryType::Insert {
651                key: b"key1".to_vec(),
652                value: b"value1".to_vec(),
653            },
654        };
655
656        let bytes = entry.to_bytes();
657        let recovered = LogEntry::from_bytes(&bytes).unwrap();
658
659        assert_eq!(recovered.lsn, entry.lsn);
660        assert_eq!(recovered.txn_id, entry.txn_id);
661        assert_eq!(recovered.prev_lsn, entry.prev_lsn);
662    }
663
664    #[test]
665    fn test_in_memory_log() {
666        let log = TransactionLog::in_memory();
667
668        let lsn1 = log.log_begin(1).unwrap();
669        let lsn2 = log
670            .log_insert(1, b"key".to_vec(), b"value".to_vec())
671            .unwrap();
672        let lsn3 = log.log_commit(1).unwrap();
673
674        assert_eq!(lsn1, 1);
675        assert_eq!(lsn2, 2);
676        assert_eq!(lsn3, 3);
677
678        let entries = log.get_txn_entries(1);
679        assert_eq!(entries.len(), 3);
680    }
681
682    #[test]
683    fn test_checkpoint() {
684        let log = TransactionLog::in_memory();
685
686        log.log_begin(1).unwrap();
687        log.log_begin(2).unwrap();
688
689        let cp_lsn = log.checkpoint(vec![1, 2]).unwrap();
690        assert_eq!(log.last_checkpoint(), cp_lsn);
691    }
692
693    #[test]
694    fn test_log_entry_types() {
695        let types = vec![
696            LogEntryType::Begin,
697            LogEntryType::Commit,
698            LogEntryType::Abort,
699            LogEntryType::Insert {
700                key: b"k".to_vec(),
701                value: b"v".to_vec(),
702            },
703            LogEntryType::Update {
704                key: b"k".to_vec(),
705                old_value: b"old".to_vec(),
706                new_value: b"new".to_vec(),
707            },
708            LogEntryType::Delete {
709                key: b"k".to_vec(),
710                old_value: b"v".to_vec(),
711            },
712            LogEntryType::Checkpoint {
713                active_txns: vec![1, 2, 3],
714            },
715            LogEntryType::Savepoint {
716                name: "sp1".to_string(),
717            },
718            LogEntryType::End,
719        ];
720
721        for t in types {
722            let bytes = t.to_bytes();
723            let (recovered, _) = LogEntryType::from_bytes(&bytes).unwrap();
724            assert_eq!(recovered, t);
725        }
726    }
727
728    #[test]
729    fn test_prev_lsn_chain() {
730        let log = TransactionLog::in_memory();
731
732        log.log_begin(1).unwrap(); // LSN 1, prev_lsn = None
733        log.log_insert(1, b"k1".to_vec(), b"v1".to_vec()).unwrap(); // LSN 2, prev_lsn = 1
734        log.log_insert(1, b"k2".to_vec(), b"v2".to_vec()).unwrap(); // LSN 3, prev_lsn = 2
735
736        let entries = log.get_txn_entries(1);
737        assert_eq!(entries[0].prev_lsn, None);
738        assert_eq!(entries[1].prev_lsn, Some(1));
739        assert_eq!(entries[2].prev_lsn, Some(2));
740    }
741
742    #[test]
743    fn test_log_entry_type_rejects_truncated_insert() {
744        let err = LogEntryType::from_bytes(&[3, 4, 0, 0, 0, b'k'])
745            .expect_err("truncated insert should fail");
746        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
747    }
748
749    #[test]
750    fn test_log_entry_rejects_truncated_type_payload() {
751        let entry = LogEntry {
752            lsn: 7,
753            txn_id: 9,
754            prev_lsn: Some(3),
755            timestamp: 42,
756            entry_type: LogEntryType::Insert {
757                key: b"hello".to_vec(),
758                value: b"world".to_vec(),
759            },
760        };
761
762        let mut bytes = entry.to_bytes();
763        bytes.truncate(bytes.len() - 2);
764
765        let err = LogEntry::from_bytes(&bytes).expect_err("truncated entry should fail");
766        assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
767    }
768}