1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::fs::{File, OpenOptions};
65use std::io::{Read, Seek, SeekFrom, Write};
66use std::path::{Path, PathBuf};
67use uuid::Uuid;
68
69use sochdb_core::{Result, SochDBError};
70
71const WAL_MAGIC: u64 = 0x534F43_48444257; pub const WAL_HEADER_SIZE: usize = 64;
80
81const WAL_VERSION: u16 = 1;
83
84#[derive(Debug, Clone)]
93pub struct WalHeader {
94 pub magic: u64,
96 pub version: u16,
98 pub flags: u16,
100 pub epoch: u64,
102 pub writer_id: Uuid,
104 pub last_commit_lsn: u64,
106 pub last_entry_crc: u32,
108 pub entry_count: u64,
110 pub header_crc: u32,
112}
113
114impl WalHeader {
115 pub fn new() -> Self {
117 Self {
118 magic: WAL_MAGIC,
119 version: WAL_VERSION,
120 flags: 0,
121 epoch: 1,
122 writer_id: Uuid::new_v4(),
123 last_commit_lsn: 0,
124 last_entry_crc: 0,
125 entry_count: 0,
126 header_crc: 0,
127 }
128 }
129
130 pub fn new_epoch(previous: &WalHeader) -> Self {
132 Self {
133 magic: WAL_MAGIC,
134 version: WAL_VERSION,
135 flags: 0,
136 epoch: previous.epoch + 1,
137 writer_id: Uuid::new_v4(),
138 last_commit_lsn: previous.last_commit_lsn,
139 last_entry_crc: previous.last_entry_crc,
140 entry_count: 0,
141 header_crc: 0,
142 }
143 }
144
145 fn compute_crc(&self) -> u32 {
147 let mut hasher = crc32fast::Hasher::new();
148 hasher.update(&self.magic.to_le_bytes());
149 hasher.update(&self.version.to_le_bytes());
150 hasher.update(&self.flags.to_le_bytes());
151 hasher.update(&self.epoch.to_le_bytes());
152 hasher.update(self.writer_id.as_bytes());
153 hasher.update(&self.last_commit_lsn.to_le_bytes());
154 hasher.update(&self.last_entry_crc.to_le_bytes());
155 hasher.update(&self.entry_count.to_le_bytes());
156 hasher.finalize()
157 }
158
159 pub fn read_from(file: &mut File) -> Result<Self> {
161 file.seek(SeekFrom::Start(0))?;
162
163 let magic = file.read_u64::<LittleEndian>()?;
164 if magic != WAL_MAGIC {
165 return Err(SochDBError::Corruption(format!(
166 "Invalid WAL magic: expected {:x}, got {:x}",
167 WAL_MAGIC, magic
168 )));
169 }
170
171 let version = file.read_u16::<LittleEndian>()?;
172 let flags = file.read_u16::<LittleEndian>()?;
173 let epoch = file.read_u64::<LittleEndian>()?;
174
175 let mut writer_id_bytes = [0u8; 16];
176 file.read_exact(&mut writer_id_bytes)?;
177 let writer_id = Uuid::from_bytes(writer_id_bytes);
178
179 let last_commit_lsn = file.read_u64::<LittleEndian>()?;
180 let last_entry_crc = file.read_u32::<LittleEndian>()?;
181 let entry_count = file.read_u64::<LittleEndian>()?;
182 let header_crc = file.read_u32::<LittleEndian>()?;
183
184 let header = Self {
185 magic,
186 version,
187 flags,
188 epoch,
189 writer_id,
190 last_commit_lsn,
191 last_entry_crc,
192 entry_count,
193 header_crc,
194 };
195
196 let computed_crc = header.compute_crc();
198 if computed_crc != header_crc {
199 return Err(SochDBError::Corruption(format!(
200 "WAL header CRC mismatch: expected {:x}, got {:x}",
201 computed_crc, header_crc
202 )));
203 }
204
205 Ok(header)
206 }
207
208 pub fn write_to(&self, file: &mut File) -> Result<()> {
210 file.seek(SeekFrom::Start(0))?;
211
212 file.write_u64::<LittleEndian>(self.magic)?;
213 file.write_u16::<LittleEndian>(self.version)?;
214 file.write_u16::<LittleEndian>(self.flags)?;
215 file.write_u64::<LittleEndian>(self.epoch)?;
216 file.write_all(self.writer_id.as_bytes())?;
217 file.write_u64::<LittleEndian>(self.last_commit_lsn)?;
218 file.write_u32::<LittleEndian>(self.last_entry_crc)?;
219 file.write_u64::<LittleEndian>(self.entry_count)?;
220
221 let crc = self.compute_crc();
223 file.write_u32::<LittleEndian>(crc)?;
224
225 let written = 8 + 2 + 2 + 8 + 16 + 8 + 4 + 8 + 4; let padding = WAL_HEADER_SIZE - written;
228 file.write_all(&vec![0u8; padding])?;
229
230 file.sync_all()?;
231 Ok(())
232 }
233
234 pub fn update_last_entry_crc(&mut self, crc: u32) {
236 self.last_entry_crc = crc;
237 self.entry_count += 1;
238 }
239
240 pub fn update_last_commit(&mut self, lsn: u64) {
242 self.last_commit_lsn = lsn;
243 }
244}
245
246impl Default for WalHeader {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[derive(Debug, Clone)]
265pub struct FencedWalEntry {
266 pub lsn: u64,
268 pub prev_crc: u32,
270 pub epoch: u64,
272 pub payload: Vec<u8>,
274 pub crc: u32,
276}
277
278impl FencedWalEntry {
279 const HEADER_SIZE: usize = 8 + 4 + 8 + 4; const FOOTER_SIZE: usize = 4; pub fn new(lsn: u64, prev_crc: u32, epoch: u64, payload: Vec<u8>) -> Self {
286 let mut entry = Self {
287 lsn,
288 prev_crc,
289 epoch,
290 payload,
291 crc: 0,
292 };
293 entry.crc = entry.compute_crc();
294 entry
295 }
296
297 fn compute_crc(&self) -> u32 {
299 let mut hasher = crc32fast::Hasher::new();
300 hasher.update(&self.lsn.to_le_bytes());
301 hasher.update(&self.prev_crc.to_le_bytes());
302 hasher.update(&self.epoch.to_le_bytes());
303 hasher.update(&(self.payload.len() as u32).to_le_bytes());
304 hasher.update(&self.payload);
305 hasher.finalize()
306 }
307
308 pub fn to_bytes(&self) -> Vec<u8> {
310 let total_len = Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE;
311 let mut buf = Vec::with_capacity(total_len);
312
313 buf.extend_from_slice(&self.lsn.to_le_bytes());
314 buf.extend_from_slice(&self.prev_crc.to_le_bytes());
315 buf.extend_from_slice(&self.epoch.to_le_bytes());
316 buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
317 buf.extend_from_slice(&self.payload);
318 buf.extend_from_slice(&self.crc.to_le_bytes());
319
320 buf
321 }
322
323 pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
325 let lsn = reader.read_u64::<LittleEndian>()?;
326 let prev_crc = reader.read_u32::<LittleEndian>()?;
327 let epoch = reader.read_u64::<LittleEndian>()?;
328 let payload_len = reader.read_u32::<LittleEndian>()? as usize;
329
330 let mut payload = vec![0u8; payload_len];
331 reader.read_exact(&mut payload)?;
332
333 let crc = reader.read_u32::<LittleEndian>()?;
334
335 let entry = Self {
336 lsn,
337 prev_crc,
338 epoch,
339 payload,
340 crc,
341 };
342
343 let computed_crc = entry.compute_crc();
345 if computed_crc != crc {
346 return Err(SochDBError::Corruption(format!(
347 "WAL entry CRC mismatch at LSN {}: expected {:x}, got {:x}",
348 lsn, computed_crc, crc
349 )));
350 }
351
352 Ok(entry)
353 }
354
355 pub fn size(&self) -> usize {
357 Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE
358 }
359}
360
361pub struct FencedWal {
372 path: PathBuf,
374 header: WalHeader,
376 file: File,
378 write_pos: u64,
380}
381
382impl FencedWal {
383 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
388 let path = path.as_ref().to_path_buf();
389
390 if let Some(parent) = path.parent() {
392 std::fs::create_dir_all(parent)?;
393 }
394
395 let file_exists = path.exists();
396 let mut file = OpenOptions::new()
397 .create(true)
398 .read(true)
399 .write(true)
400 .open(&path)?;
401
402 let (header, write_pos) = if file_exists && file.metadata()?.len() >= WAL_HEADER_SIZE as u64
403 {
404 let existing_header = WalHeader::read_from(&mut file)?;
406
407 let new_header = WalHeader::new_epoch(&existing_header);
409 new_header.write_to(&mut file)?;
410
411 let write_pos = Self::find_write_position(&mut file, &existing_header)?;
413
414 (new_header, write_pos)
415 } else {
416 let header = WalHeader::new();
418 header.write_to(&mut file)?;
419 (header, WAL_HEADER_SIZE as u64)
420 };
421
422 Ok(Self {
423 path,
424 header,
425 file,
426 write_pos,
427 })
428 }
429
430 fn find_write_position(file: &mut File, header: &WalHeader) -> Result<u64> {
432 file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
433
434 let mut pos = WAL_HEADER_SIZE as u64;
435 let mut prev_crc = 0u32;
436 let mut entries_verified = 0u64;
437
438 loop {
439 match FencedWalEntry::read_from(file) {
440 Ok(entry) => {
441 if entries_verified > 0 && entry.prev_crc != prev_crc {
443 eprintln!(
445 "WAL chain broken at LSN {}: expected prev_crc {:x}, got {:x}",
446 entry.lsn, prev_crc, entry.prev_crc
447 );
448 break;
449 }
450
451 if entry.epoch > header.epoch {
453 return Err(SochDBError::SplitBrain(format!(
455 "Entry has future epoch {} > header epoch {}",
456 entry.epoch, header.epoch
457 )));
458 }
459
460 prev_crc = entry.crc;
461 pos += entry.size() as u64;
462 entries_verified += 1;
463 }
464 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
465 break;
466 }
467 Err(SochDBError::Corruption(_)) => {
468 break;
470 }
471 Err(e) => return Err(e),
472 }
473 }
474
475 file.set_len(pos)?;
477 file.seek(SeekFrom::Start(pos))?;
478
479 Ok(pos)
480 }
481
482 pub fn append(&mut self, payload: Vec<u8>) -> Result<u64> {
484 let lsn = self.header.entry_count + 1;
485 let entry = FencedWalEntry::new(
486 lsn,
487 self.header.last_entry_crc,
488 self.header.epoch,
489 payload,
490 );
491
492 let bytes = entry.to_bytes();
493 self.file.seek(SeekFrom::Start(self.write_pos))?;
494 self.file.write_all(&bytes)?;
495
496 self.write_pos += bytes.len() as u64;
497 self.header.update_last_entry_crc(entry.crc);
498
499 Ok(lsn)
500 }
501
502 pub fn sync(&mut self) -> Result<()> {
504 self.file.sync_all()?;
505 self.header.write_to(&mut self.file)?;
506 Ok(())
507 }
508
509 pub fn commit(&mut self, lsn: u64) -> Result<()> {
511 self.header.update_last_commit(lsn);
512 self.sync()
513 }
514
515 pub fn epoch(&self) -> u64 {
517 self.header.epoch
518 }
519
520 pub fn writer_id(&self) -> Uuid {
522 self.header.writer_id
523 }
524
525 pub fn last_commit_lsn(&self) -> u64 {
527 self.header.last_commit_lsn
528 }
529
530 pub fn entry_count(&self) -> u64 {
532 self.header.entry_count
533 }
534
535 pub fn replay<F>(&mut self, mut callback: F) -> Result<u64>
537 where
538 F: FnMut(&FencedWalEntry) -> Result<()>,
539 {
540 self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
541
542 let mut prev_crc = 0u32;
543 let mut count = 0u64;
544
545 loop {
546 match FencedWalEntry::read_from(&mut self.file) {
547 Ok(entry) => {
548 if count > 0 && entry.prev_crc != prev_crc {
550 return Err(SochDBError::Corruption(format!(
551 "Chain broken at LSN {}: expected {:x}, got {:x}",
552 entry.lsn, prev_crc, entry.prev_crc
553 )));
554 }
555
556 callback(&entry)?;
557 prev_crc = entry.crc;
558 count += 1;
559 }
560 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
561 break;
562 }
563 Err(e) => return Err(e),
564 }
565 }
566
567 Ok(count)
568 }
569
570 pub fn replay_committed<F>(&mut self, callback: F) -> Result<u64>
572 where
573 F: FnMut(&FencedWalEntry) -> Result<()>,
574 {
575 let commit_lsn = self.header.last_commit_lsn;
576 let mut wrapped_callback = callback;
577 let mut committed_count = 0u64;
578
579 self.replay(|entry| {
580 if entry.lsn <= commit_lsn {
581 wrapped_callback(entry)?;
582 committed_count += 1;
583 }
584 Ok(())
585 })?;
586
587 Ok(committed_count)
588 }
589}
590
591#[cfg(test)]
596mod tests {
597 use super::*;
598 use tempfile::TempDir;
599
600 #[test]
601 fn test_header_roundtrip() {
602 let dir = TempDir::new().unwrap();
603 let path = dir.path().join("test.wal");
604
605 let header = WalHeader::new();
606
607 {
608 let mut file = File::create(&path).unwrap();
609 header.write_to(&mut file).unwrap();
610 }
611
612 {
613 let mut file = File::open(&path).unwrap();
614 let read_header = WalHeader::read_from(&mut file).unwrap();
615 assert_eq!(read_header.magic, header.magic);
616 assert_eq!(read_header.epoch, header.epoch);
617 assert_eq!(read_header.writer_id, header.writer_id);
618 }
619 }
620
621 #[test]
622 fn test_epoch_increment() {
623 let dir = TempDir::new().unwrap();
624 let path = dir.path().join("test.wal");
625
626 let wal1 = FencedWal::open(&path).unwrap();
628 let epoch1 = wal1.epoch();
629 let writer1 = wal1.writer_id();
630 drop(wal1);
631
632 let wal2 = FencedWal::open(&path).unwrap();
634 let epoch2 = wal2.epoch();
635 let writer2 = wal2.writer_id();
636
637 assert_eq!(epoch2, epoch1 + 1);
638 assert_ne!(writer1, writer2);
639 }
640
641 #[test]
642 fn test_entry_chain() {
643 let dir = TempDir::new().unwrap();
644 let path = dir.path().join("test.wal");
645
646 let mut wal = FencedWal::open(&path).unwrap();
647
648 wal.append(b"entry1".to_vec()).unwrap();
650 wal.append(b"entry2".to_vec()).unwrap();
651 wal.append(b"entry3".to_vec()).unwrap();
652 wal.sync().unwrap();
653
654 let mut entries = Vec::new();
656 wal.replay(|entry| {
657 entries.push(entry.payload.clone());
658 Ok(())
659 })
660 .unwrap();
661
662 assert_eq!(entries.len(), 3);
663 assert_eq!(entries[0], b"entry1");
664 assert_eq!(entries[1], b"entry2");
665 assert_eq!(entries[2], b"entry3");
666 }
667
668 #[test]
669 fn test_commit_replay() {
670 let dir = TempDir::new().unwrap();
671 let path = dir.path().join("test.wal");
672
673 {
674 let mut wal = FencedWal::open(&path).unwrap();
675 wal.append(b"committed1".to_vec()).unwrap();
676 wal.append(b"committed2".to_vec()).unwrap();
677 wal.commit(2).unwrap();
678 wal.append(b"uncommitted".to_vec()).unwrap();
679 wal.sync().unwrap();
680 }
681
682 let mut wal = FencedWal::open(&path).unwrap();
684 let mut entries = Vec::new();
685 wal.replay_committed(|entry| {
686 entries.push(entry.payload.clone());
687 Ok(())
688 })
689 .unwrap();
690
691 assert_eq!(entries.len(), 2);
692 assert_eq!(entries[0], b"committed1");
693 assert_eq!(entries[1], b"committed2");
694 }
695}