1use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13pub type TxnId = u64;
15
16pub type Lsn = u64;
18
19pub type Timestamp = u64;
21
22fn read_bytes<'a>(
23 data: &'a [u8],
24 offset: &mut usize,
25 len: usize,
26 context: &'static str,
27) -> io::Result<&'a [u8]> {
28 let end = offset.saturating_add(len);
29 if end > data.len() {
30 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, context));
31 }
32 let bytes = &data[*offset..end];
33 *offset = end;
34 Ok(bytes)
35}
36
37fn read_array<const N: usize>(
38 data: &[u8],
39 offset: &mut usize,
40 context: &'static str,
41) -> io::Result<[u8; N]> {
42 let bytes = read_bytes(data, offset, N, context)?;
43 let mut array = [0u8; N];
44 array.copy_from_slice(bytes);
45 Ok(array)
46}
47
48fn read_u32(data: &[u8], offset: &mut usize, context: &'static str) -> io::Result<u32> {
49 Ok(u32::from_le_bytes(read_array::<4>(data, offset, context)?))
50}
51
52fn read_u64(data: &[u8], offset: &mut usize, context: &'static str) -> io::Result<u64> {
53 Ok(u64::from_le_bytes(read_array::<8>(data, offset, context)?))
54}
55
56fn io_lock_error(context: &'static str) -> io::Error {
57 io::Error::other(format!("{context} lock poisoned"))
58}
59
60fn io_read_guard<'a, T>(
61 lock: &'a RwLock<T>,
62 context: &'static str,
63) -> io::Result<RwLockReadGuard<'a, T>> {
64 lock.read().map_err(|_| io_lock_error(context))
65}
66
67fn io_write_guard<'a, T>(
68 lock: &'a RwLock<T>,
69 context: &'static str,
70) -> io::Result<RwLockWriteGuard<'a, T>> {
71 lock.write().map_err(|_| io_lock_error(context))
72}
73
74fn io_mutex_guard<'a, T>(
75 lock: &'a Mutex<T>,
76 context: &'static str,
77) -> io::Result<MutexGuard<'a, T>> {
78 lock.lock().map_err(|_| io_lock_error(context))
79}
80
81fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
82 match lock.read() {
83 Ok(guard) => guard,
84 Err(poisoned) => poisoned.into_inner(),
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum LogEntryType {
91 Begin,
93 Commit,
95 Abort,
97 Insert { key: Vec<u8>, value: Vec<u8> },
99 Update {
101 key: Vec<u8>,
102 old_value: Vec<u8>,
103 new_value: Vec<u8>,
104 },
105 Delete { key: Vec<u8>, old_value: Vec<u8> },
107 Checkpoint { active_txns: Vec<TxnId> },
109 Savepoint { name: String },
111 RollbackToSavepoint { name: String },
113 Compensate { original_lsn: Lsn },
115 End,
117}
118
119impl LogEntryType {
120 pub fn is_commit(&self) -> bool {
122 matches!(self, LogEntryType::Commit)
123 }
124
125 pub fn is_abort(&self) -> bool {
127 matches!(self, LogEntryType::Abort)
128 }
129
130 pub fn is_data_modification(&self) -> bool {
132 matches!(
133 self,
134 LogEntryType::Insert { .. } | LogEntryType::Update { .. } | LogEntryType::Delete { .. }
135 )
136 }
137
138 pub fn to_bytes(&self) -> Vec<u8> {
140 let mut buf = Vec::new();
141
142 match self {
143 LogEntryType::Begin => buf.push(0),
144 LogEntryType::Commit => buf.push(1),
145 LogEntryType::Abort => buf.push(2),
146 LogEntryType::Insert { key, value } => {
147 buf.push(3);
148 buf.extend(&(key.len() as u32).to_le_bytes());
149 buf.extend(key);
150 buf.extend(&(value.len() as u32).to_le_bytes());
151 buf.extend(value);
152 }
153 LogEntryType::Update {
154 key,
155 old_value,
156 new_value,
157 } => {
158 buf.push(4);
159 buf.extend(&(key.len() as u32).to_le_bytes());
160 buf.extend(key);
161 buf.extend(&(old_value.len() as u32).to_le_bytes());
162 buf.extend(old_value);
163 buf.extend(&(new_value.len() as u32).to_le_bytes());
164 buf.extend(new_value);
165 }
166 LogEntryType::Delete { key, old_value } => {
167 buf.push(5);
168 buf.extend(&(key.len() as u32).to_le_bytes());
169 buf.extend(key);
170 buf.extend(&(old_value.len() as u32).to_le_bytes());
171 buf.extend(old_value);
172 }
173 LogEntryType::Checkpoint { active_txns } => {
174 buf.push(6);
175 buf.extend(&(active_txns.len() as u32).to_le_bytes());
176 for txn in active_txns {
177 buf.extend(&txn.to_le_bytes());
178 }
179 }
180 LogEntryType::Savepoint { name } => {
181 buf.push(7);
182 let name_bytes = name.as_bytes();
183 buf.extend(&(name_bytes.len() as u32).to_le_bytes());
184 buf.extend(name_bytes);
185 }
186 LogEntryType::RollbackToSavepoint { name } => {
187 buf.push(8);
188 let name_bytes = name.as_bytes();
189 buf.extend(&(name_bytes.len() as u32).to_le_bytes());
190 buf.extend(name_bytes);
191 }
192 LogEntryType::Compensate { original_lsn } => {
193 buf.push(9);
194 buf.extend(&original_lsn.to_le_bytes());
195 }
196 LogEntryType::End => buf.push(10),
197 }
198
199 buf
200 }
201
202 pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
204 if data.is_empty() {
205 return Err(io::Error::new(io::ErrorKind::InvalidData, "Empty data"));
206 }
207
208 let mut offset = 0;
209 let tag = read_bytes(data, &mut offset, 1, "Missing log entry tag")?[0];
210
211 let entry = match tag {
212 0 => LogEntryType::Begin,
213 1 => LogEntryType::Commit,
214 2 => LogEntryType::Abort,
215 3 => {
216 let key_len =
218 read_u32(data, &mut offset, "Missing WAL insert key length")? as usize;
219 let key =
220 read_bytes(data, &mut offset, key_len, "Truncated WAL insert key")?.to_vec();
221 let value_len =
222 read_u32(data, &mut offset, "Missing WAL insert value length")? as usize;
223 let value = read_bytes(data, &mut offset, value_len, "Truncated WAL insert value")?
224 .to_vec();
225 LogEntryType::Insert { key, value }
226 }
227 4 => {
228 let key_len =
230 read_u32(data, &mut offset, "Missing WAL update key length")? as usize;
231 let key =
232 read_bytes(data, &mut offset, key_len, "Truncated WAL update key")?.to_vec();
233 let old_len =
234 read_u32(data, &mut offset, "Missing WAL update old value length")? as usize;
235 let old_value =
236 read_bytes(data, &mut offset, old_len, "Truncated WAL update old value")?
237 .to_vec();
238 let new_len =
239 read_u32(data, &mut offset, "Missing WAL update new value length")? as usize;
240 let new_value =
241 read_bytes(data, &mut offset, new_len, "Truncated WAL update new value")?
242 .to_vec();
243 LogEntryType::Update {
244 key,
245 old_value,
246 new_value,
247 }
248 }
249 5 => {
250 let key_len =
252 read_u32(data, &mut offset, "Missing WAL delete key length")? as usize;
253 let key =
254 read_bytes(data, &mut offset, key_len, "Truncated WAL delete key")?.to_vec();
255 let old_len =
256 read_u32(data, &mut offset, "Missing WAL delete old value length")? as usize;
257 let old_value =
258 read_bytes(data, &mut offset, old_len, "Truncated WAL delete old value")?
259 .to_vec();
260 LogEntryType::Delete { key, old_value }
261 }
262 6 => {
263 let count =
265 read_u32(data, &mut offset, "Missing WAL checkpoint txn count")? as usize;
266 let mut active_txns = Vec::with_capacity(count);
267 for _ in 0..count {
268 let txn =
269 read_u64(data, &mut offset, "Truncated WAL checkpoint transaction id")?;
270 active_txns.push(txn);
271 }
272 LogEntryType::Checkpoint { active_txns }
273 }
274 7 => {
275 let name_len =
277 read_u32(data, &mut offset, "Missing WAL savepoint name length")? as usize;
278 let name = String::from_utf8_lossy(read_bytes(
279 data,
280 &mut offset,
281 name_len,
282 "Truncated WAL savepoint name",
283 )?)
284 .to_string();
285 LogEntryType::Savepoint { name }
286 }
287 8 => {
288 let name_len = read_u32(
290 data,
291 &mut offset,
292 "Missing WAL rollback-to-savepoint name length",
293 )? as usize;
294 let name = String::from_utf8_lossy(read_bytes(
295 data,
296 &mut offset,
297 name_len,
298 "Truncated WAL rollback-to-savepoint name",
299 )?)
300 .to_string();
301 LogEntryType::RollbackToSavepoint { name }
302 }
303 9 => {
304 let original_lsn =
306 read_u64(data, &mut offset, "Truncated WAL compensate original LSN")?;
307 LogEntryType::Compensate { original_lsn }
308 }
309 10 => LogEntryType::End,
310 _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid tag")),
311 };
312
313 Ok((entry, offset))
314 }
315}
316
317#[derive(Debug, Clone)]
319pub struct LogEntry {
320 pub lsn: Lsn,
322 pub txn_id: TxnId,
324 pub prev_lsn: Option<Lsn>,
326 pub timestamp: Timestamp,
328 pub entry_type: LogEntryType,
330}
331
332impl LogEntry {
333 pub fn new(txn_id: TxnId, prev_lsn: Option<Lsn>, entry_type: LogEntryType) -> Self {
335 Self {
336 lsn: 0, txn_id,
338 prev_lsn,
339 timestamp: SystemTime::now()
340 .duration_since(UNIX_EPOCH)
341 .unwrap_or_default()
342 .as_micros() as Timestamp,
343 entry_type,
344 }
345 }
346
347 pub fn to_bytes(&self) -> Vec<u8> {
349 let mut buf = Vec::new();
350
351 buf.extend(&self.lsn.to_le_bytes());
353 buf.extend(&self.txn_id.to_le_bytes());
354 buf.extend(&self.prev_lsn.unwrap_or(0).to_le_bytes());
355 buf.extend(&self.timestamp.to_le_bytes());
356
357 let type_bytes = self.entry_type.to_bytes();
359 buf.extend(&(type_bytes.len() as u32).to_le_bytes());
360 buf.extend(&type_bytes);
361
362 let checksum: u8 = buf.iter().fold(0, |acc, &b| acc ^ b);
364 buf.push(checksum);
365
366 buf
367 }
368
369 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
371 if data.len() < 37 {
372 return Err(io::Error::new(io::ErrorKind::InvalidData, "Too short"));
374 }
375
376 let mut offset = 0;
377 let lsn = read_u64(data, &mut offset, "Missing WAL entry LSN")?;
378 let txn_id = read_u64(data, &mut offset, "Missing WAL entry txn id")?;
379 let prev_lsn_raw = read_u64(data, &mut offset, "Missing WAL entry prev_lsn")?;
380 let prev_lsn = if prev_lsn_raw == 0 {
381 None
382 } else {
383 Some(prev_lsn_raw)
384 };
385 let timestamp = read_u64(data, &mut offset, "Missing WAL entry timestamp")?;
386 let type_len = read_u32(data, &mut offset, "Missing WAL entry type length")? as usize;
387 let entry_type_bytes = read_bytes(
388 data,
389 &mut offset,
390 type_len,
391 "Truncated WAL entry type bytes",
392 )?;
393 let (entry_type, consumed) = LogEntryType::from_bytes(entry_type_bytes)?;
394 if consumed != entry_type_bytes.len() {
395 return Err(io::Error::new(
396 io::ErrorKind::InvalidData,
397 "WAL entry type length mismatch",
398 ));
399 }
400
401 let stored_checksum = *data.get(offset).ok_or_else(|| {
403 io::Error::new(io::ErrorKind::UnexpectedEof, "Missing WAL entry checksum")
404 })?;
405 let computed: u8 = data[..offset].iter().fold(0, |acc, &b| acc ^ b);
406 if stored_checksum != computed {
407 return Err(io::Error::new(
408 io::ErrorKind::InvalidData,
409 "Checksum mismatch",
410 ));
411 }
412
413 Ok(Self {
414 lsn,
415 txn_id,
416 prev_lsn,
417 timestamp,
418 entry_type,
419 })
420 }
421
422 pub fn serialized_size(&self) -> usize {
424 32 + 4 + self.entry_type.to_bytes().len() + 1
425 }
426}
427
428#[derive(Debug, Clone)]
430pub struct WalConfig {
431 pub path: PathBuf,
433 pub sync_on_commit: bool,
435 pub buffer_size: usize,
437 pub max_file_size: u64,
439 pub checkpoint_interval: u64,
441}
442
443impl Default for WalConfig {
444 fn default() -> Self {
445 Self {
446 path: PathBuf::from("wal.log"),
447 sync_on_commit: true,
448 buffer_size: 64 * 1024, max_file_size: 100 * 1024 * 1024, checkpoint_interval: 1000,
451 }
452 }
453}
454
455impl WalConfig {
456 pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
458 Self {
459 path: path.as_ref().to_path_buf(),
460 ..Default::default()
461 }
462 }
463}
464
465#[derive(Debug, Clone, Default)]
467pub struct WalStats {
468 pub entries_written: u64,
470 pub bytes_written: u64,
472 pub syncs: u64,
474 pub checkpoints: u64,
476 pub file_size: u64,
478}
479
480pub struct TransactionLog {
482 config: WalConfig,
484 next_lsn: AtomicU64,
486 file: Option<Mutex<BufWriter<File>>>,
488 buffer: RwLock<VecDeque<LogEntry>>,
490 txn_prev_lsn: RwLock<std::collections::HashMap<TxnId, Lsn>>,
492 stats: RwLock<WalStats>,
494 last_checkpoint_lsn: AtomicU64,
496}
497
498impl TransactionLog {
499 pub fn new(config: WalConfig) -> io::Result<Self> {
501 let file = if config.path.as_os_str().is_empty() {
502 None
503 } else {
504 let f = OpenOptions::new()
505 .create(true)
506 .append(true)
507 .read(true)
508 .open(&config.path)?;
509 Some(Mutex::new(BufWriter::with_capacity(config.buffer_size, f)))
510 };
511
512 Ok(Self {
513 config,
514 next_lsn: AtomicU64::new(1),
515 file,
516 buffer: RwLock::new(VecDeque::new()),
517 txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
518 stats: RwLock::new(WalStats::default()),
519 last_checkpoint_lsn: AtomicU64::new(0),
520 })
521 }
522
523 pub fn in_memory() -> Self {
525 Self {
526 config: WalConfig {
527 path: PathBuf::new(),
528 ..Default::default()
529 },
530 next_lsn: AtomicU64::new(1),
531 file: None,
532 buffer: RwLock::new(VecDeque::new()),
533 txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
534 stats: RwLock::new(WalStats::default()),
535 last_checkpoint_lsn: AtomicU64::new(0),
536 }
537 }
538
539 pub fn append(&self, mut entry: LogEntry) -> io::Result<Lsn> {
541 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
543 entry.lsn = lsn;
544
545 {
547 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
548 entry.prev_lsn = prev_lsns.get(&entry.txn_id).copied();
549 prev_lsns.insert(entry.txn_id, lsn);
550 }
551
552 let bytes = entry.to_bytes();
553
554 if let Some(ref file) = self.file {
556 let mut writer = io_mutex_guard(file, "wal file")?;
557 writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
559 writer.write_all(&bytes)?;
560
561 if self.config.sync_on_commit && entry.entry_type.is_commit() {
563 writer.flush()?;
564 writer.get_mut().sync_all()?;
565
566 let mut stats = io_write_guard(&self.stats, "wal stats")?;
567 stats.syncs += 1;
568 }
569 }
570
571 {
573 let mut buffer = io_write_guard(&self.buffer, "wal buffer")?;
574 buffer.push_back(entry);
575
576 while buffer.len() > 10000 {
578 buffer.pop_front();
579 }
580 }
581
582 {
584 let mut stats = io_write_guard(&self.stats, "wal stats")?;
585 stats.entries_written += 1;
586 stats.bytes_written += bytes.len() as u64 + 4;
587 stats.file_size += bytes.len() as u64 + 4;
588 }
589
590 Ok(lsn)
591 }
592
593 pub fn log_begin(&self, txn_id: TxnId) -> io::Result<Lsn> {
595 self.append(LogEntry::new(txn_id, None, LogEntryType::Begin))
596 }
597
598 pub fn log_commit(&self, txn_id: TxnId) -> io::Result<Lsn> {
600 let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Commit))?;
601
602 {
604 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
605 prev_lsns.remove(&txn_id);
606 }
607
608 Ok(lsn)
609 }
610
611 pub fn log_abort(&self, txn_id: TxnId) -> io::Result<Lsn> {
613 let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Abort))?;
614
615 {
617 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
618 prev_lsns.remove(&txn_id);
619 }
620
621 Ok(lsn)
622 }
623
624 pub fn log_insert(&self, txn_id: TxnId, key: Vec<u8>, value: Vec<u8>) -> io::Result<Lsn> {
626 self.append(LogEntry::new(
627 txn_id,
628 None,
629 LogEntryType::Insert { key, value },
630 ))
631 }
632
633 pub fn log_update(
635 &self,
636 txn_id: TxnId,
637 key: Vec<u8>,
638 old_value: Vec<u8>,
639 new_value: Vec<u8>,
640 ) -> io::Result<Lsn> {
641 self.append(LogEntry::new(
642 txn_id,
643 None,
644 LogEntryType::Update {
645 key,
646 old_value,
647 new_value,
648 },
649 ))
650 }
651
652 pub fn log_delete(&self, txn_id: TxnId, key: Vec<u8>, old_value: Vec<u8>) -> io::Result<Lsn> {
654 self.append(LogEntry::new(
655 txn_id,
656 None,
657 LogEntryType::Delete { key, old_value },
658 ))
659 }
660
661 pub fn log_savepoint(&self, txn_id: TxnId, name: String) -> io::Result<Lsn> {
663 self.append(LogEntry::new(
664 txn_id,
665 None,
666 LogEntryType::Savepoint { name },
667 ))
668 }
669
670 pub fn checkpoint(&self, active_txns: Vec<TxnId>) -> io::Result<Lsn> {
672 let lsn = self.append(LogEntry::new(
673 0, None,
675 LogEntryType::Checkpoint { active_txns },
676 ))?;
677
678 if let Some(ref file) = self.file {
680 let mut writer = io_mutex_guard(file, "wal file")?;
681 writer.flush()?;
682 writer.get_mut().sync_all()?;
683 }
684
685 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
686
687 {
688 let mut stats = io_write_guard(&self.stats, "wal stats")?;
689 stats.checkpoints += 1;
690 }
691
692 Ok(lsn)
693 }
694
695 pub fn flush(&self) -> io::Result<()> {
697 if let Some(ref file) = self.file {
698 let mut writer = io_mutex_guard(file, "wal file")?;
699 writer.flush()?;
700 writer.get_mut().sync_all()?;
701 }
702 Ok(())
703 }
704
705 pub fn get_txn_entries(&self, txn_id: TxnId) -> Vec<LogEntry> {
707 let buffer = recover_read_guard(&self.buffer);
708 buffer
709 .iter()
710 .filter(|e| e.txn_id == txn_id)
711 .cloned()
712 .collect()
713 }
714
715 pub fn get_entries_since(&self, lsn: Lsn) -> Vec<LogEntry> {
717 let buffer = recover_read_guard(&self.buffer);
718 buffer.iter().filter(|e| e.lsn >= lsn).cloned().collect()
719 }
720
721 pub fn current_lsn(&self) -> Lsn {
723 self.next_lsn.load(Ordering::SeqCst) - 1
724 }
725
726 pub fn last_checkpoint(&self) -> Lsn {
728 self.last_checkpoint_lsn.load(Ordering::SeqCst)
729 }
730
731 pub fn stats(&self) -> WalStats {
733 recover_read_guard(&self.stats).clone()
734 }
735
736 pub fn config(&self) -> &WalConfig {
738 &self.config
739 }
740}
741
742pub struct LogReader {
744 reader: BufReader<File>,
745}
746
747impl LogReader {
748 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
750 let file = File::open(path)?;
751 Ok(Self {
752 reader: BufReader::new(file),
753 })
754 }
755
756 pub fn read_all(&mut self) -> io::Result<Vec<LogEntry>> {
758 let mut entries = Vec::new();
759
760 loop {
761 match self.read_entry() {
762 Ok(entry) => entries.push(entry),
763 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
764 Err(e) => return Err(e),
765 }
766 }
767
768 Ok(entries)
769 }
770
771 pub fn read_entry(&mut self) -> io::Result<LogEntry> {
773 let mut len_buf = [0u8; 4];
774 self.reader.read_exact(&mut len_buf)?;
775 let len = u32::from_le_bytes(len_buf) as usize;
776
777 let mut data = vec![0u8; len];
778 self.reader.read_exact(&mut data)?;
779
780 LogEntry::from_bytes(&data)
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787
788 #[test]
789 fn test_log_entry_serialize() {
790 let entry = LogEntry {
791 lsn: 42,
792 txn_id: 1,
793 prev_lsn: Some(40),
794 timestamp: 1234567890,
795 entry_type: LogEntryType::Insert {
796 key: b"key1".to_vec(),
797 value: b"value1".to_vec(),
798 },
799 };
800
801 let bytes = entry.to_bytes();
802 let recovered = LogEntry::from_bytes(&bytes).unwrap();
803
804 assert_eq!(recovered.lsn, entry.lsn);
805 assert_eq!(recovered.txn_id, entry.txn_id);
806 assert_eq!(recovered.prev_lsn, entry.prev_lsn);
807 }
808
809 #[test]
810 fn test_in_memory_log() {
811 let log = TransactionLog::in_memory();
812
813 let lsn1 = log.log_begin(1).unwrap();
814 let lsn2 = log
815 .log_insert(1, b"key".to_vec(), b"value".to_vec())
816 .unwrap();
817 let lsn3 = log.log_commit(1).unwrap();
818
819 assert_eq!(lsn1, 1);
820 assert_eq!(lsn2, 2);
821 assert_eq!(lsn3, 3);
822
823 let entries = log.get_txn_entries(1);
824 assert_eq!(entries.len(), 3);
825 }
826
827 #[test]
828 fn test_checkpoint() {
829 let log = TransactionLog::in_memory();
830
831 log.log_begin(1).unwrap();
832 log.log_begin(2).unwrap();
833
834 let cp_lsn = log.checkpoint(vec![1, 2]).unwrap();
835 assert_eq!(log.last_checkpoint(), cp_lsn);
836 }
837
838 #[test]
839 fn test_log_entry_types() {
840 let types = vec![
841 LogEntryType::Begin,
842 LogEntryType::Commit,
843 LogEntryType::Abort,
844 LogEntryType::Insert {
845 key: b"k".to_vec(),
846 value: b"v".to_vec(),
847 },
848 LogEntryType::Update {
849 key: b"k".to_vec(),
850 old_value: b"old".to_vec(),
851 new_value: b"new".to_vec(),
852 },
853 LogEntryType::Delete {
854 key: b"k".to_vec(),
855 old_value: b"v".to_vec(),
856 },
857 LogEntryType::Checkpoint {
858 active_txns: vec![1, 2, 3],
859 },
860 LogEntryType::Savepoint {
861 name: "sp1".to_string(),
862 },
863 LogEntryType::End,
864 ];
865
866 for t in types {
867 let bytes = t.to_bytes();
868 let (recovered, _) = LogEntryType::from_bytes(&bytes).unwrap();
869 assert_eq!(recovered, t);
870 }
871 }
872
873 #[test]
874 fn test_prev_lsn_chain() {
875 let log = TransactionLog::in_memory();
876
877 log.log_begin(1).unwrap(); log.log_insert(1, b"k1".to_vec(), b"v1".to_vec()).unwrap(); log.log_insert(1, b"k2".to_vec(), b"v2".to_vec()).unwrap(); let entries = log.get_txn_entries(1);
882 assert_eq!(entries[0].prev_lsn, None);
883 assert_eq!(entries[1].prev_lsn, Some(1));
884 assert_eq!(entries[2].prev_lsn, Some(2));
885 }
886
887 #[test]
888 fn test_log_entry_type_rejects_truncated_insert() {
889 let err = LogEntryType::from_bytes(&[3, 4, 0, 0, 0, b'k'])
890 .expect_err("truncated insert should fail");
891 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
892 }
893
894 #[test]
895 fn test_log_entry_rejects_truncated_type_payload() {
896 let entry = LogEntry {
897 lsn: 7,
898 txn_id: 9,
899 prev_lsn: Some(3),
900 timestamp: 42,
901 entry_type: LogEntryType::Insert {
902 key: b"hello".to_vec(),
903 value: b"world".to_vec(),
904 },
905 };
906
907 let mut bytes = entry.to_bytes();
908 bytes.truncate(bytes.len() - 2);
909
910 let err = LogEntry::from_bytes(&bytes).expect_err("truncated entry should fail");
911 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
912 }
913}