1use std::collections::VecDeque;
6use std::fs::{File, OpenOptions};
7use std::io::{self, BufReader, BufWriter, Read, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13pub type TxnId = u64;
15
16pub type Lsn = u64;
18
19pub type Timestamp = u64;
21
22fn io_lock_error(context: &'static str) -> io::Error {
23 io::Error::other(format!("{context} lock poisoned"))
24}
25
26fn io_read_guard<'a, T>(
27 lock: &'a RwLock<T>,
28 context: &'static str,
29) -> io::Result<RwLockReadGuard<'a, T>> {
30 lock.read().map_err(|_| io_lock_error(context))
31}
32
33fn io_write_guard<'a, T>(
34 lock: &'a RwLock<T>,
35 context: &'static str,
36) -> io::Result<RwLockWriteGuard<'a, T>> {
37 lock.write().map_err(|_| io_lock_error(context))
38}
39
40fn io_mutex_guard<'a, T>(
41 lock: &'a Mutex<T>,
42 context: &'static str,
43) -> io::Result<MutexGuard<'a, T>> {
44 lock.lock().map_err(|_| io_lock_error(context))
45}
46
47fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
48 match lock.read() {
49 Ok(guard) => guard,
50 Err(poisoned) => poisoned.into_inner(),
51 }
52}
53
54fn transaction_wal_frame_error(err: reddb_file::RdbFileError) -> io::Error {
55 let message = err.to_string();
56 let kind = if message.contains("missing")
57 || message.contains("empty")
58 || message.contains("truncated")
59 || message.contains("too short")
60 {
61 io::ErrorKind::UnexpectedEof
62 } else {
63 io::ErrorKind::InvalidData
64 };
65 io::Error::new(kind, message)
66}
67
68fn transaction_wal_payload_error(err: reddb_file::RdbFileError) -> io::Error {
69 transaction_wal_frame_error(err)
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum LogEntryType {
75 Begin,
77 Commit,
79 Abort,
81 Insert { key: Vec<u8>, value: Vec<u8> },
83 Update {
85 key: Vec<u8>,
86 old_value: Vec<u8>,
87 new_value: Vec<u8>,
88 },
89 Delete { key: Vec<u8>, old_value: Vec<u8> },
91 Checkpoint { active_txns: Vec<TxnId> },
93 Savepoint { name: String },
95 RollbackToSavepoint { name: String },
97 Compensate { original_lsn: Lsn },
99 End,
101}
102
103impl LogEntryType {
104 pub fn is_commit(&self) -> bool {
106 matches!(self, LogEntryType::Commit)
107 }
108
109 pub fn is_abort(&self) -> bool {
111 matches!(self, LogEntryType::Abort)
112 }
113
114 pub fn is_data_modification(&self) -> bool {
116 matches!(
117 self,
118 LogEntryType::Insert { .. } | LogEntryType::Update { .. } | LogEntryType::Delete { .. }
119 )
120 }
121
122 pub fn to_bytes(&self) -> Vec<u8> {
124 reddb_file::encode_transaction_wal_entry_payload(&self.to_file_payload())
125 }
126
127 pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
129 let (payload, consumed) = reddb_file::decode_transaction_wal_entry_payload(data)
130 .map_err(transaction_wal_payload_error)?;
131 Ok((Self::from_file_payload(payload), consumed))
132 }
133
134 fn to_file_payload(&self) -> reddb_file::TransactionWalEntryPayload {
135 match self {
136 LogEntryType::Begin => reddb_file::TransactionWalEntryPayload::Begin,
137 LogEntryType::Commit => reddb_file::TransactionWalEntryPayload::Commit,
138 LogEntryType::Abort => reddb_file::TransactionWalEntryPayload::Abort,
139 LogEntryType::Insert { key, value } => reddb_file::TransactionWalEntryPayload::Insert {
140 key: key.clone(),
141 value: value.clone(),
142 },
143 LogEntryType::Update {
144 key,
145 old_value,
146 new_value,
147 } => reddb_file::TransactionWalEntryPayload::Update {
148 key: key.clone(),
149 old_value: old_value.clone(),
150 new_value: new_value.clone(),
151 },
152 LogEntryType::Delete { key, old_value } => {
153 reddb_file::TransactionWalEntryPayload::Delete {
154 key: key.clone(),
155 old_value: old_value.clone(),
156 }
157 }
158 LogEntryType::Checkpoint { active_txns } => {
159 reddb_file::TransactionWalEntryPayload::Checkpoint {
160 active_txns: active_txns.clone(),
161 }
162 }
163 LogEntryType::Savepoint { name } => {
164 reddb_file::TransactionWalEntryPayload::Savepoint { name: name.clone() }
165 }
166 LogEntryType::RollbackToSavepoint { name } => {
167 reddb_file::TransactionWalEntryPayload::RollbackToSavepoint { name: name.clone() }
168 }
169 LogEntryType::Compensate { original_lsn } => {
170 reddb_file::TransactionWalEntryPayload::Compensate {
171 original_lsn: *original_lsn,
172 }
173 }
174 LogEntryType::End => reddb_file::TransactionWalEntryPayload::End,
175 }
176 }
177
178 fn from_file_payload(payload: reddb_file::TransactionWalEntryPayload) -> Self {
179 match payload {
180 reddb_file::TransactionWalEntryPayload::Begin => LogEntryType::Begin,
181 reddb_file::TransactionWalEntryPayload::Commit => LogEntryType::Commit,
182 reddb_file::TransactionWalEntryPayload::Abort => LogEntryType::Abort,
183 reddb_file::TransactionWalEntryPayload::Insert { key, value } => {
184 LogEntryType::Insert { key, value }
185 }
186 reddb_file::TransactionWalEntryPayload::Update {
187 key,
188 old_value,
189 new_value,
190 } => LogEntryType::Update {
191 key,
192 old_value,
193 new_value,
194 },
195 reddb_file::TransactionWalEntryPayload::Delete { key, old_value } => {
196 LogEntryType::Delete { key, old_value }
197 }
198 reddb_file::TransactionWalEntryPayload::Checkpoint { active_txns } => {
199 LogEntryType::Checkpoint { active_txns }
200 }
201 reddb_file::TransactionWalEntryPayload::Savepoint { name } => {
202 LogEntryType::Savepoint { name }
203 }
204 reddb_file::TransactionWalEntryPayload::RollbackToSavepoint { name } => {
205 LogEntryType::RollbackToSavepoint { name }
206 }
207 reddb_file::TransactionWalEntryPayload::Compensate { original_lsn } => {
208 LogEntryType::Compensate { original_lsn }
209 }
210 reddb_file::TransactionWalEntryPayload::End => LogEntryType::End,
211 }
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct LogEntry {
218 pub lsn: Lsn,
220 pub txn_id: TxnId,
222 pub prev_lsn: Option<Lsn>,
224 pub timestamp: Timestamp,
226 pub entry_type: LogEntryType,
228}
229
230impl LogEntry {
231 pub fn new(txn_id: TxnId, prev_lsn: Option<Lsn>, entry_type: LogEntryType) -> Self {
233 Self {
234 lsn: 0, txn_id,
236 prev_lsn,
237 timestamp: SystemTime::now()
238 .duration_since(UNIX_EPOCH)
239 .unwrap_or_default()
240 .as_micros() as Timestamp,
241 entry_type,
242 }
243 }
244
245 pub fn to_bytes(&self) -> Vec<u8> {
247 reddb_file::encode_transaction_wal_record_frame(&reddb_file::TransactionWalRecordFrame {
248 lsn: self.lsn,
249 txn_id: self.txn_id,
250 prev_lsn: self.prev_lsn,
251 timestamp: self.timestamp,
252 entry_type_payload: self.entry_type.to_bytes(),
253 })
254 }
255
256 pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
258 let frame = reddb_file::decode_transaction_wal_record_frame(data)
259 .map_err(transaction_wal_frame_error)?;
260 let (entry_type, consumed) = LogEntryType::from_bytes(&frame.entry_type_payload)?;
261 if consumed != frame.entry_type_payload.len() {
262 return Err(io::Error::new(
263 io::ErrorKind::InvalidData,
264 "WAL entry type length mismatch",
265 ));
266 }
267
268 Ok(Self {
269 lsn: frame.lsn,
270 txn_id: frame.txn_id,
271 prev_lsn: frame.prev_lsn,
272 timestamp: frame.timestamp,
273 entry_type,
274 })
275 }
276
277 pub fn serialized_size(&self) -> usize {
279 reddb_file::transaction_wal_record_encoded_len(self.entry_type.to_bytes().len())
280 }
281}
282
283#[derive(Debug, Clone)]
285pub struct WalConfig {
286 pub path: PathBuf,
288 pub sync_on_commit: bool,
290 pub buffer_size: usize,
292 pub max_file_size: u64,
294 pub checkpoint_interval: u64,
296}
297
298impl Default for WalConfig {
299 fn default() -> Self {
300 Self {
301 path: reddb_file::layout::default_transaction_wal_path(),
302 sync_on_commit: true,
303 buffer_size: 64 * 1024, max_file_size: 100 * 1024 * 1024, checkpoint_interval: 1000,
306 }
307 }
308}
309
310impl WalConfig {
311 pub fn with_path<P: AsRef<Path>>(path: P) -> Self {
313 Self {
314 path: path.as_ref().to_path_buf(),
315 ..Default::default()
316 }
317 }
318}
319
320#[derive(Debug, Clone, Default)]
322pub struct WalStats {
323 pub entries_written: u64,
325 pub bytes_written: u64,
327 pub syncs: u64,
329 pub checkpoints: u64,
331 pub file_size: u64,
333}
334
335pub struct TransactionLog {
337 config: WalConfig,
339 next_lsn: AtomicU64,
341 file: Option<Mutex<BufWriter<File>>>,
343 buffer: RwLock<VecDeque<LogEntry>>,
345 txn_prev_lsn: RwLock<std::collections::HashMap<TxnId, Lsn>>,
347 stats: RwLock<WalStats>,
349 last_checkpoint_lsn: AtomicU64,
351}
352
353impl TransactionLog {
354 pub fn new(config: WalConfig) -> io::Result<Self> {
356 let file = if config.path.as_os_str().is_empty() {
357 None
358 } else {
359 let f = OpenOptions::new()
360 .create(true)
361 .append(true)
362 .read(true)
363 .open(&config.path)?;
364 Some(Mutex::new(BufWriter::with_capacity(config.buffer_size, f)))
365 };
366
367 Ok(Self {
368 config,
369 next_lsn: AtomicU64::new(1),
370 file,
371 buffer: RwLock::new(VecDeque::new()),
372 txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
373 stats: RwLock::new(WalStats::default()),
374 last_checkpoint_lsn: AtomicU64::new(0),
375 })
376 }
377
378 pub fn in_memory() -> Self {
380 Self {
381 config: WalConfig {
382 path: PathBuf::new(),
383 ..Default::default()
384 },
385 next_lsn: AtomicU64::new(1),
386 file: None,
387 buffer: RwLock::new(VecDeque::new()),
388 txn_prev_lsn: RwLock::new(std::collections::HashMap::new()),
389 stats: RwLock::new(WalStats::default()),
390 last_checkpoint_lsn: AtomicU64::new(0),
391 }
392 }
393
394 pub fn append(&self, mut entry: LogEntry) -> io::Result<Lsn> {
396 let lsn = self.next_lsn.fetch_add(1, Ordering::SeqCst);
398 entry.lsn = lsn;
399
400 {
402 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
403 entry.prev_lsn = prev_lsns.get(&entry.txn_id).copied();
404 prev_lsns.insert(entry.txn_id, lsn);
405 }
406
407 let bytes = entry.to_bytes();
408
409 if let Some(ref file) = self.file {
411 let mut writer = io_mutex_guard(file, "wal file")?;
412 writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
414 writer.write_all(&bytes)?;
415
416 if self.config.sync_on_commit && entry.entry_type.is_commit() {
418 writer.flush()?;
419 writer.get_mut().sync_all()?;
420
421 let mut stats = io_write_guard(&self.stats, "wal stats")?;
422 stats.syncs += 1;
423 }
424 }
425
426 {
428 let mut buffer = io_write_guard(&self.buffer, "wal buffer")?;
429 buffer.push_back(entry);
430
431 while buffer.len() > 10000 {
433 buffer.pop_front();
434 }
435 }
436
437 {
439 let mut stats = io_write_guard(&self.stats, "wal stats")?;
440 stats.entries_written += 1;
441 stats.bytes_written += bytes.len() as u64 + 4;
442 stats.file_size += bytes.len() as u64 + 4;
443 }
444
445 Ok(lsn)
446 }
447
448 pub fn log_begin(&self, txn_id: TxnId) -> io::Result<Lsn> {
450 self.append(LogEntry::new(txn_id, None, LogEntryType::Begin))
451 }
452
453 pub fn log_commit(&self, txn_id: TxnId) -> io::Result<Lsn> {
455 let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Commit))?;
456
457 {
459 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
460 prev_lsns.remove(&txn_id);
461 }
462
463 Ok(lsn)
464 }
465
466 pub fn log_abort(&self, txn_id: TxnId) -> io::Result<Lsn> {
468 let lsn = self.append(LogEntry::new(txn_id, None, LogEntryType::Abort))?;
469
470 {
472 let mut prev_lsns = io_write_guard(&self.txn_prev_lsn, "wal prev_lsn map")?;
473 prev_lsns.remove(&txn_id);
474 }
475
476 Ok(lsn)
477 }
478
479 pub fn log_insert(&self, txn_id: TxnId, key: Vec<u8>, value: Vec<u8>) -> io::Result<Lsn> {
481 self.append(LogEntry::new(
482 txn_id,
483 None,
484 LogEntryType::Insert { key, value },
485 ))
486 }
487
488 pub fn log_update(
490 &self,
491 txn_id: TxnId,
492 key: Vec<u8>,
493 old_value: Vec<u8>,
494 new_value: Vec<u8>,
495 ) -> io::Result<Lsn> {
496 self.append(LogEntry::new(
497 txn_id,
498 None,
499 LogEntryType::Update {
500 key,
501 old_value,
502 new_value,
503 },
504 ))
505 }
506
507 pub fn log_delete(&self, txn_id: TxnId, key: Vec<u8>, old_value: Vec<u8>) -> io::Result<Lsn> {
509 self.append(LogEntry::new(
510 txn_id,
511 None,
512 LogEntryType::Delete { key, old_value },
513 ))
514 }
515
516 pub fn log_savepoint(&self, txn_id: TxnId, name: String) -> io::Result<Lsn> {
518 self.append(LogEntry::new(
519 txn_id,
520 None,
521 LogEntryType::Savepoint { name },
522 ))
523 }
524
525 pub fn checkpoint(&self, active_txns: Vec<TxnId>) -> io::Result<Lsn> {
527 let lsn = self.append(LogEntry::new(
528 0, None,
530 LogEntryType::Checkpoint { active_txns },
531 ))?;
532
533 if let Some(ref file) = self.file {
535 let mut writer = io_mutex_guard(file, "wal file")?;
536 writer.flush()?;
537 writer.get_mut().sync_all()?;
538 }
539
540 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
541
542 {
543 let mut stats = io_write_guard(&self.stats, "wal stats")?;
544 stats.checkpoints += 1;
545 }
546
547 Ok(lsn)
548 }
549
550 pub fn flush(&self) -> io::Result<()> {
552 if let Some(ref file) = self.file {
553 let mut writer = io_mutex_guard(file, "wal file")?;
554 writer.flush()?;
555 writer.get_mut().sync_all()?;
556 }
557 Ok(())
558 }
559
560 pub fn get_txn_entries(&self, txn_id: TxnId) -> Vec<LogEntry> {
562 let buffer = recover_read_guard(&self.buffer);
563 buffer
564 .iter()
565 .filter(|e| e.txn_id == txn_id)
566 .cloned()
567 .collect()
568 }
569
570 pub fn get_entries_since(&self, lsn: Lsn) -> Vec<LogEntry> {
572 let buffer = recover_read_guard(&self.buffer);
573 buffer.iter().filter(|e| e.lsn >= lsn).cloned().collect()
574 }
575
576 pub fn current_lsn(&self) -> Lsn {
578 self.next_lsn.load(Ordering::SeqCst) - 1
579 }
580
581 pub fn last_checkpoint(&self) -> Lsn {
583 self.last_checkpoint_lsn.load(Ordering::SeqCst)
584 }
585
586 pub fn stats(&self) -> WalStats {
588 recover_read_guard(&self.stats).clone()
589 }
590
591 pub fn config(&self) -> &WalConfig {
593 &self.config
594 }
595}
596
597pub struct LogReader {
599 reader: BufReader<File>,
600}
601
602impl LogReader {
603 pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
605 let file = File::open(path)?;
606 Ok(Self {
607 reader: BufReader::new(file),
608 })
609 }
610
611 pub fn read_all(&mut self) -> io::Result<Vec<LogEntry>> {
613 let mut entries = Vec::new();
614
615 loop {
616 match self.read_entry() {
617 Ok(entry) => entries.push(entry),
618 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
619 Err(e) => return Err(e),
620 }
621 }
622
623 Ok(entries)
624 }
625
626 pub fn read_entry(&mut self) -> io::Result<LogEntry> {
628 let mut len_buf = [0u8; 4];
629 self.reader.read_exact(&mut len_buf)?;
630 let len = u32::from_le_bytes(len_buf) as usize;
631
632 let mut data = vec![0u8; len];
633 self.reader.read_exact(&mut data)?;
634
635 LogEntry::from_bytes(&data)
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_log_entry_serialize() {
645 let entry = LogEntry {
646 lsn: 42,
647 txn_id: 1,
648 prev_lsn: Some(40),
649 timestamp: 1234567890,
650 entry_type: LogEntryType::Insert {
651 key: b"key1".to_vec(),
652 value: b"value1".to_vec(),
653 },
654 };
655
656 let bytes = entry.to_bytes();
657 let recovered = LogEntry::from_bytes(&bytes).unwrap();
658
659 assert_eq!(recovered.lsn, entry.lsn);
660 assert_eq!(recovered.txn_id, entry.txn_id);
661 assert_eq!(recovered.prev_lsn, entry.prev_lsn);
662 }
663
664 #[test]
665 fn test_in_memory_log() {
666 let log = TransactionLog::in_memory();
667
668 let lsn1 = log.log_begin(1).unwrap();
669 let lsn2 = log
670 .log_insert(1, b"key".to_vec(), b"value".to_vec())
671 .unwrap();
672 let lsn3 = log.log_commit(1).unwrap();
673
674 assert_eq!(lsn1, 1);
675 assert_eq!(lsn2, 2);
676 assert_eq!(lsn3, 3);
677
678 let entries = log.get_txn_entries(1);
679 assert_eq!(entries.len(), 3);
680 }
681
682 #[test]
683 fn test_checkpoint() {
684 let log = TransactionLog::in_memory();
685
686 log.log_begin(1).unwrap();
687 log.log_begin(2).unwrap();
688
689 let cp_lsn = log.checkpoint(vec![1, 2]).unwrap();
690 assert_eq!(log.last_checkpoint(), cp_lsn);
691 }
692
693 #[test]
694 fn test_log_entry_types() {
695 let types = vec![
696 LogEntryType::Begin,
697 LogEntryType::Commit,
698 LogEntryType::Abort,
699 LogEntryType::Insert {
700 key: b"k".to_vec(),
701 value: b"v".to_vec(),
702 },
703 LogEntryType::Update {
704 key: b"k".to_vec(),
705 old_value: b"old".to_vec(),
706 new_value: b"new".to_vec(),
707 },
708 LogEntryType::Delete {
709 key: b"k".to_vec(),
710 old_value: b"v".to_vec(),
711 },
712 LogEntryType::Checkpoint {
713 active_txns: vec![1, 2, 3],
714 },
715 LogEntryType::Savepoint {
716 name: "sp1".to_string(),
717 },
718 LogEntryType::End,
719 ];
720
721 for t in types {
722 let bytes = t.to_bytes();
723 let (recovered, _) = LogEntryType::from_bytes(&bytes).unwrap();
724 assert_eq!(recovered, t);
725 }
726 }
727
728 #[test]
729 fn test_prev_lsn_chain() {
730 let log = TransactionLog::in_memory();
731
732 log.log_begin(1).unwrap(); log.log_insert(1, b"k1".to_vec(), b"v1".to_vec()).unwrap(); log.log_insert(1, b"k2".to_vec(), b"v2".to_vec()).unwrap(); let entries = log.get_txn_entries(1);
737 assert_eq!(entries[0].prev_lsn, None);
738 assert_eq!(entries[1].prev_lsn, Some(1));
739 assert_eq!(entries[2].prev_lsn, Some(2));
740 }
741
742 #[test]
743 fn test_log_entry_type_rejects_truncated_insert() {
744 let err = LogEntryType::from_bytes(&[3, 4, 0, 0, 0, b'k'])
745 .expect_err("truncated insert should fail");
746 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
747 }
748
749 #[test]
750 fn test_log_entry_rejects_truncated_type_payload() {
751 let entry = LogEntry {
752 lsn: 7,
753 txn_id: 9,
754 prev_lsn: Some(3),
755 timestamp: 42,
756 entry_type: LogEntryType::Insert {
757 key: b"hello".to_vec(),
758 value: b"world".to_vec(),
759 },
760 };
761
762 let mut bytes = entry.to_bytes();
763 bytes.truncate(bytes.len() - 2);
764
765 let err = LogEntry::from_bytes(&bytes).expect_err("truncated entry should fail");
766 assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
767 }
768}