1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
67use std::fs::{File, OpenOptions};
68use std::io::{Read, Seek, SeekFrom, Write};
69use std::path::{Path, PathBuf};
70use uuid::Uuid;
71
72use sochdb_core::{Result, SochDBError};
73
74const WAL_MAGIC: u64 = 0x534F43_48444257; pub const WAL_HEADER_SIZE: usize = 64;
83
84const WAL_VERSION: u16 = 1;
86
87#[derive(Debug, Clone)]
96pub struct WalHeader {
97 pub magic: u64,
99 pub version: u16,
101 pub flags: u16,
103 pub epoch: u64,
105 pub writer_id: Uuid,
107 pub last_commit_lsn: u64,
109 pub last_entry_crc: u32,
111 pub entry_count: u64,
113 pub header_crc: u32,
115}
116
117impl WalHeader {
118 pub fn new() -> Self {
120 Self {
121 magic: WAL_MAGIC,
122 version: WAL_VERSION,
123 flags: 0,
124 epoch: 1,
125 writer_id: Uuid::new_v4(),
126 last_commit_lsn: 0,
127 last_entry_crc: 0,
128 entry_count: 0,
129 header_crc: 0,
130 }
131 }
132
133 pub fn new_epoch(previous: &WalHeader) -> Self {
135 Self {
136 magic: WAL_MAGIC,
137 version: WAL_VERSION,
138 flags: 0,
139 epoch: previous.epoch + 1,
140 writer_id: Uuid::new_v4(),
141 last_commit_lsn: previous.last_commit_lsn,
142 last_entry_crc: previous.last_entry_crc,
143 entry_count: 0,
144 header_crc: 0,
145 }
146 }
147
148 fn compute_crc(&self) -> u32 {
150 let mut hasher = crc32fast::Hasher::new();
151 hasher.update(&self.magic.to_le_bytes());
152 hasher.update(&self.version.to_le_bytes());
153 hasher.update(&self.flags.to_le_bytes());
154 hasher.update(&self.epoch.to_le_bytes());
155 hasher.update(self.writer_id.as_bytes());
156 hasher.update(&self.last_commit_lsn.to_le_bytes());
157 hasher.update(&self.last_entry_crc.to_le_bytes());
158 hasher.update(&self.entry_count.to_le_bytes());
159 hasher.finalize()
160 }
161
162 pub fn read_from(file: &mut File) -> Result<Self> {
164 file.seek(SeekFrom::Start(0))?;
165
166 let magic = file.read_u64::<LittleEndian>()?;
167 if magic != WAL_MAGIC {
168 return Err(SochDBError::Corruption(format!(
169 "Invalid WAL magic: expected {:x}, got {:x}",
170 WAL_MAGIC, magic
171 )));
172 }
173
174 let version = file.read_u16::<LittleEndian>()?;
175 let flags = file.read_u16::<LittleEndian>()?;
176 let epoch = file.read_u64::<LittleEndian>()?;
177
178 let mut writer_id_bytes = [0u8; 16];
179 file.read_exact(&mut writer_id_bytes)?;
180 let writer_id = Uuid::from_bytes(writer_id_bytes);
181
182 let last_commit_lsn = file.read_u64::<LittleEndian>()?;
183 let last_entry_crc = file.read_u32::<LittleEndian>()?;
184 let entry_count = file.read_u64::<LittleEndian>()?;
185 let header_crc = file.read_u32::<LittleEndian>()?;
186
187 let header = Self {
188 magic,
189 version,
190 flags,
191 epoch,
192 writer_id,
193 last_commit_lsn,
194 last_entry_crc,
195 entry_count,
196 header_crc,
197 };
198
199 let computed_crc = header.compute_crc();
201 if computed_crc != header_crc {
202 return Err(SochDBError::Corruption(format!(
203 "WAL header CRC mismatch: expected {:x}, got {:x}",
204 computed_crc, header_crc
205 )));
206 }
207
208 Ok(header)
209 }
210
211 pub fn write_to(&self, file: &mut File) -> Result<()> {
213 file.seek(SeekFrom::Start(0))?;
214
215 file.write_u64::<LittleEndian>(self.magic)?;
216 file.write_u16::<LittleEndian>(self.version)?;
217 file.write_u16::<LittleEndian>(self.flags)?;
218 file.write_u64::<LittleEndian>(self.epoch)?;
219 file.write_all(self.writer_id.as_bytes())?;
220 file.write_u64::<LittleEndian>(self.last_commit_lsn)?;
221 file.write_u32::<LittleEndian>(self.last_entry_crc)?;
222 file.write_u64::<LittleEndian>(self.entry_count)?;
223
224 let crc = self.compute_crc();
226 file.write_u32::<LittleEndian>(crc)?;
227
228 let written = 8 + 2 + 2 + 8 + 16 + 8 + 4 + 8 + 4; let padding = WAL_HEADER_SIZE - written;
231 file.write_all(&vec![0u8; padding])?;
232
233 file.sync_all()?;
234 Ok(())
235 }
236
237 pub fn update_last_entry_crc(&mut self, crc: u32) {
239 self.last_entry_crc = crc;
240 self.entry_count += 1;
241 }
242
243 pub fn update_last_commit(&mut self, lsn: u64) {
245 self.last_commit_lsn = lsn;
246 }
247}
248
249impl Default for WalHeader {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255#[derive(Debug, Clone)]
268pub struct FencedWalEntry {
269 pub lsn: u64,
271 pub prev_crc: u32,
273 pub epoch: u64,
275 pub payload: Vec<u8>,
277 pub crc: u32,
279}
280
281impl FencedWalEntry {
282 const HEADER_SIZE: usize = 8 + 4 + 8 + 4; const FOOTER_SIZE: usize = 4; pub fn new(lsn: u64, prev_crc: u32, epoch: u64, payload: Vec<u8>) -> Self {
289 let mut entry = Self {
290 lsn,
291 prev_crc,
292 epoch,
293 payload,
294 crc: 0,
295 };
296 entry.crc = entry.compute_crc();
297 entry
298 }
299
300 fn compute_crc(&self) -> u32 {
302 let mut hasher = crc32fast::Hasher::new();
303 hasher.update(&self.lsn.to_le_bytes());
304 hasher.update(&self.prev_crc.to_le_bytes());
305 hasher.update(&self.epoch.to_le_bytes());
306 hasher.update(&(self.payload.len() as u32).to_le_bytes());
307 hasher.update(&self.payload);
308 hasher.finalize()
309 }
310
311 pub fn to_bytes(&self) -> Vec<u8> {
313 let total_len = Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE;
314 let mut buf = Vec::with_capacity(total_len);
315
316 buf.extend_from_slice(&self.lsn.to_le_bytes());
317 buf.extend_from_slice(&self.prev_crc.to_le_bytes());
318 buf.extend_from_slice(&self.epoch.to_le_bytes());
319 buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
320 buf.extend_from_slice(&self.payload);
321 buf.extend_from_slice(&self.crc.to_le_bytes());
322
323 buf
324 }
325
326 pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
328 let lsn = reader.read_u64::<LittleEndian>()?;
329 let prev_crc = reader.read_u32::<LittleEndian>()?;
330 let epoch = reader.read_u64::<LittleEndian>()?;
331 let payload_len = reader.read_u32::<LittleEndian>()? as usize;
332
333 let mut payload = vec![0u8; payload_len];
334 reader.read_exact(&mut payload)?;
335
336 let crc = reader.read_u32::<LittleEndian>()?;
337
338 let entry = Self {
339 lsn,
340 prev_crc,
341 epoch,
342 payload,
343 crc,
344 };
345
346 let computed_crc = entry.compute_crc();
348 if computed_crc != crc {
349 return Err(SochDBError::Corruption(format!(
350 "WAL entry CRC mismatch at LSN {}: expected {:x}, got {:x}",
351 lsn, computed_crc, crc
352 )));
353 }
354
355 Ok(entry)
356 }
357
358 pub fn size(&self) -> usize {
360 Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE
361 }
362}
363
364pub struct FencedWal {
375 path: PathBuf,
377 header: WalHeader,
379 file: File,
381 write_pos: u64,
383}
384
385impl FencedWal {
386 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
391 let path = path.as_ref().to_path_buf();
392
393 if let Some(parent) = path.parent() {
395 std::fs::create_dir_all(parent)?;
396 }
397
398 let file_exists = path.exists();
399 let mut file = OpenOptions::new()
400 .create(true)
401 .read(true)
402 .write(true)
403 .open(&path)?;
404
405 let (header, write_pos) = if file_exists && file.metadata()?.len() >= WAL_HEADER_SIZE as u64
406 {
407 let existing_header = WalHeader::read_from(&mut file)?;
409
410 let new_header = WalHeader::new_epoch(&existing_header);
412 new_header.write_to(&mut file)?;
413
414 let write_pos = Self::find_write_position(&mut file, &existing_header)?;
416
417 (new_header, write_pos)
418 } else {
419 let header = WalHeader::new();
421 header.write_to(&mut file)?;
422 (header, WAL_HEADER_SIZE as u64)
423 };
424
425 Ok(Self {
426 path,
427 header,
428 file,
429 write_pos,
430 })
431 }
432
433 fn find_write_position(file: &mut File, header: &WalHeader) -> Result<u64> {
435 file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
436
437 let mut pos = WAL_HEADER_SIZE as u64;
438 let mut prev_crc = 0u32;
439 let mut entries_verified = 0u64;
440
441 loop {
442 match FencedWalEntry::read_from(file) {
443 Ok(entry) => {
444 if entries_verified > 0 && entry.prev_crc != prev_crc {
446 eprintln!(
448 "WAL chain broken at LSN {}: expected prev_crc {:x}, got {:x}",
449 entry.lsn, prev_crc, entry.prev_crc
450 );
451 break;
452 }
453
454 if entry.epoch > header.epoch {
456 return Err(SochDBError::SplitBrain(format!(
458 "Entry has future epoch {} > header epoch {}",
459 entry.epoch, header.epoch
460 )));
461 }
462
463 prev_crc = entry.crc;
464 pos += entry.size() as u64;
465 entries_verified += 1;
466 }
467 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
468 break;
469 }
470 Err(SochDBError::Corruption(_)) => {
471 break;
473 }
474 Err(e) => return Err(e),
475 }
476 }
477
478 file.set_len(pos)?;
480 file.seek(SeekFrom::Start(pos))?;
481
482 Ok(pos)
483 }
484
485 pub fn append(&mut self, payload: Vec<u8>) -> Result<u64> {
487 let lsn = self.header.entry_count + 1;
488 let entry = FencedWalEntry::new(
489 lsn,
490 self.header.last_entry_crc,
491 self.header.epoch,
492 payload,
493 );
494
495 let bytes = entry.to_bytes();
496 self.file.seek(SeekFrom::Start(self.write_pos))?;
497 self.file.write_all(&bytes)?;
498
499 self.write_pos += bytes.len() as u64;
500 self.header.update_last_entry_crc(entry.crc);
501
502 Ok(lsn)
503 }
504
505 pub fn sync(&mut self) -> Result<()> {
507 self.file.sync_all()?;
508 self.header.write_to(&mut self.file)?;
509 Ok(())
510 }
511
512 pub fn commit(&mut self, lsn: u64) -> Result<()> {
514 self.header.update_last_commit(lsn);
515 self.sync()
516 }
517
518 pub fn epoch(&self) -> u64 {
520 self.header.epoch
521 }
522
523 pub fn writer_id(&self) -> Uuid {
525 self.header.writer_id
526 }
527
528 pub fn last_commit_lsn(&self) -> u64 {
530 self.header.last_commit_lsn
531 }
532
533 pub fn entry_count(&self) -> u64 {
535 self.header.entry_count
536 }
537
538 pub fn replay<F>(&mut self, mut callback: F) -> Result<u64>
540 where
541 F: FnMut(&FencedWalEntry) -> Result<()>,
542 {
543 self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
544
545 let mut prev_crc = 0u32;
546 let mut count = 0u64;
547
548 loop {
549 match FencedWalEntry::read_from(&mut self.file) {
550 Ok(entry) => {
551 if count > 0 && entry.prev_crc != prev_crc {
553 return Err(SochDBError::Corruption(format!(
554 "Chain broken at LSN {}: expected {:x}, got {:x}",
555 entry.lsn, prev_crc, entry.prev_crc
556 )));
557 }
558
559 callback(&entry)?;
560 prev_crc = entry.crc;
561 count += 1;
562 }
563 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
564 break;
565 }
566 Err(e) => return Err(e),
567 }
568 }
569
570 Ok(count)
571 }
572
573 pub fn replay_committed<F>(&mut self, callback: F) -> Result<u64>
575 where
576 F: FnMut(&FencedWalEntry) -> Result<()>,
577 {
578 let commit_lsn = self.header.last_commit_lsn;
579 let mut wrapped_callback = callback;
580 let mut committed_count = 0u64;
581
582 self.replay(|entry| {
583 if entry.lsn <= commit_lsn {
584 wrapped_callback(entry)?;
585 committed_count += 1;
586 }
587 Ok(())
588 })?;
589
590 Ok(committed_count)
591 }
592}
593
594#[cfg(test)]
599mod tests {
600 use super::*;
601 use tempfile::TempDir;
602
603 #[test]
604 fn test_header_roundtrip() {
605 let dir = TempDir::new().unwrap();
606 let path = dir.path().join("test.wal");
607
608 let header = WalHeader::new();
609
610 {
611 let mut file = File::create(&path).unwrap();
612 header.write_to(&mut file).unwrap();
613 }
614
615 {
616 let mut file = File::open(&path).unwrap();
617 let read_header = WalHeader::read_from(&mut file).unwrap();
618 assert_eq!(read_header.magic, header.magic);
619 assert_eq!(read_header.epoch, header.epoch);
620 assert_eq!(read_header.writer_id, header.writer_id);
621 }
622 }
623
624 #[test]
625 fn test_epoch_increment() {
626 let dir = TempDir::new().unwrap();
627 let path = dir.path().join("test.wal");
628
629 let wal1 = FencedWal::open(&path).unwrap();
631 let epoch1 = wal1.epoch();
632 let writer1 = wal1.writer_id();
633 drop(wal1);
634
635 let wal2 = FencedWal::open(&path).unwrap();
637 let epoch2 = wal2.epoch();
638 let writer2 = wal2.writer_id();
639
640 assert_eq!(epoch2, epoch1 + 1);
641 assert_ne!(writer1, writer2);
642 }
643
644 #[test]
645 fn test_entry_chain() {
646 let dir = TempDir::new().unwrap();
647 let path = dir.path().join("test.wal");
648
649 let mut wal = FencedWal::open(&path).unwrap();
650
651 wal.append(b"entry1".to_vec()).unwrap();
653 wal.append(b"entry2".to_vec()).unwrap();
654 wal.append(b"entry3".to_vec()).unwrap();
655 wal.sync().unwrap();
656
657 let mut entries = Vec::new();
659 wal.replay(|entry| {
660 entries.push(entry.payload.clone());
661 Ok(())
662 })
663 .unwrap();
664
665 assert_eq!(entries.len(), 3);
666 assert_eq!(entries[0], b"entry1");
667 assert_eq!(entries[1], b"entry2");
668 assert_eq!(entries[2], b"entry3");
669 }
670
671 #[test]
672 fn test_commit_replay() {
673 let dir = TempDir::new().unwrap();
674 let path = dir.path().join("test.wal");
675
676 {
677 let mut wal = FencedWal::open(&path).unwrap();
678 wal.append(b"committed1".to_vec()).unwrap();
679 wal.append(b"committed2".to_vec()).unwrap();
680 wal.commit(2).unwrap();
681 wal.append(b"uncommitted".to_vec()).unwrap();
682 wal.sync().unwrap();
683 }
684
685 let mut wal = FencedWal::open(&path).unwrap();
687 let mut entries = Vec::new();
688 wal.replay_committed(|entry| {
689 entries.push(entry.payload.clone());
690 Ok(())
691 })
692 .unwrap();
693
694 assert_eq!(entries.len(), 2);
695 assert_eq!(entries[0], b"committed1");
696 assert_eq!(entries[1], b"committed2");
697 }
698}