Skip to main content

sochdb_storage/
wal_fencing.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! WAL Epoch Fencing for Split-Brain Detection
19//!
20//! This module implements epoch-based fencing to detect concurrent writers
21//! and enable safe recovery from split-brain scenarios.
22//!
23//! ## Problem
24//!
25//! When multiple processes write to the same WAL (even with interleaved appends
26//! that don't physically corrupt bytes), the sequence numbers become meaningless:
27//!
28//! 1. Each process maintains an independent counter
29//! 2. Gaps in sequence numbers indicate lost writes
30//! 3. Duplicate sequences indicate split-brain
31//!
32//! ## Solution
33//!
34//! Store a fencing header in the first 64 bytes of the WAL file:
35//!
36//! ```text
37//! ┌─────────────┬────────────────┬─────────────┬──────────────┬────────────┐
38//! │ magic (8B)  │ epoch (8B)     │ writer_id   │ last_commit  │ header_crc │
39//! │             │                │ (16B UUID)  │ _lsn (8B)    │ (8B)       │
40//! └─────────────┴────────────────┴─────────────┴──────────────┴────────────┘
41//! ```
42//!
43//! ## Algorithm
44//!
45//! On WAL.open():
46//!   - Read header
47//!   - If header.writer_id ≠ my_uuid AND header.epoch == current_epoch:
48//!       Another writer is active or crashed → Error::SplitBrainDetected
49//!   - Increment epoch, write new header with fsync
50//!
51//! ## Chain Integrity
52//!
53//! Each WAL entry stores the CRC of the previous entry:
54//!
55//! ```text
56//! ┌──────────┬───────────────┬──────────────┬─────────┬──────────┐
57//! │ entry_lsn│ prev_entry_crc│ epoch        │ payload │ entry_crc│
58//! └──────────┴───────────────┴──────────────┴─────────┴──────────┘
59//! ```
60//!
61//! During recovery:
62//! - Verify: entry[i].prev_crc == computed_crc(entry[i-1])
63//! - Break in chain indicates corruption or split-brain interleaving
64//! - Truncate WAL at first chain break
65
66use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
67use std::fs::{File, OpenOptions};
68use std::io::{Read, Seek, SeekFrom, Write};
69use std::path::{Path, PathBuf};
70use uuid::Uuid;
71
72use sochdb_core::{Result, SochDBError};
73
74// =============================================================================
75// Constants
76// =============================================================================
77
78/// Magic number identifying SochDB WAL files
79const WAL_MAGIC: u64 = 0x534F43_48444257; // "SOCHDBW" as hex
80
81/// WAL header size in bytes
82pub const WAL_HEADER_SIZE: usize = 64;
83
84/// Version of the WAL format
85const WAL_VERSION: u16 = 1;
86
87// =============================================================================
88// WAL Header
89// =============================================================================
90
91/// WAL file header with epoch fencing
92///
93/// This header is stored in the first 64 bytes of every WAL file.
94/// It enables detection of concurrent writers and safe recovery.
95#[derive(Debug, Clone)]
96pub struct WalHeader {
97    /// Magic number (0x534F4348444257 = "SOCHDBW")
98    pub magic: u64,
99    /// WAL format version
100    pub version: u16,
101    /// Reserved flags
102    pub flags: u16,
103    /// Epoch counter - incremented on each writer open
104    pub epoch: u64,
105    /// UUID of the current writer
106    pub writer_id: Uuid,
107    /// LSN of the last committed transaction
108    pub last_commit_lsn: u64,
109    /// CRC of the last entry (for chain verification)
110    pub last_entry_crc: u32,
111    /// Number of entries written in current epoch
112    pub entry_count: u64,
113    /// CRC32 of the header itself
114    pub header_crc: u32,
115}
116
117impl WalHeader {
118    /// Create a new header for a fresh WAL
119    pub fn new() -> Self {
120        Self {
121            magic: WAL_MAGIC,
122            version: WAL_VERSION,
123            flags: 0,
124            epoch: 1,
125            writer_id: Uuid::new_v4(),
126            last_commit_lsn: 0,
127            last_entry_crc: 0,
128            entry_count: 0,
129            header_crc: 0,
130        }
131    }
132
133    /// Create header for a new epoch (when taking over from previous writer)
134    pub fn new_epoch(previous: &WalHeader) -> Self {
135        Self {
136            magic: WAL_MAGIC,
137            version: WAL_VERSION,
138            flags: 0,
139            epoch: previous.epoch + 1,
140            writer_id: Uuid::new_v4(),
141            last_commit_lsn: previous.last_commit_lsn,
142            last_entry_crc: previous.last_entry_crc,
143            entry_count: 0,
144            header_crc: 0,
145        }
146    }
147
148    /// Compute CRC for this header
149    fn compute_crc(&self) -> u32 {
150        let mut hasher = crc32fast::Hasher::new();
151        hasher.update(&self.magic.to_le_bytes());
152        hasher.update(&self.version.to_le_bytes());
153        hasher.update(&self.flags.to_le_bytes());
154        hasher.update(&self.epoch.to_le_bytes());
155        hasher.update(self.writer_id.as_bytes());
156        hasher.update(&self.last_commit_lsn.to_le_bytes());
157        hasher.update(&self.last_entry_crc.to_le_bytes());
158        hasher.update(&self.entry_count.to_le_bytes());
159        hasher.finalize()
160    }
161
162    /// Read header from file
163    pub fn read_from(file: &mut File) -> Result<Self> {
164        file.seek(SeekFrom::Start(0))?;
165
166        let magic = file.read_u64::<LittleEndian>()?;
167        if magic != WAL_MAGIC {
168            return Err(SochDBError::Corruption(format!(
169                "Invalid WAL magic: expected {:x}, got {:x}",
170                WAL_MAGIC, magic
171            )));
172        }
173
174        let version = file.read_u16::<LittleEndian>()?;
175        let flags = file.read_u16::<LittleEndian>()?;
176        let epoch = file.read_u64::<LittleEndian>()?;
177
178        let mut writer_id_bytes = [0u8; 16];
179        file.read_exact(&mut writer_id_bytes)?;
180        let writer_id = Uuid::from_bytes(writer_id_bytes);
181
182        let last_commit_lsn = file.read_u64::<LittleEndian>()?;
183        let last_entry_crc = file.read_u32::<LittleEndian>()?;
184        let entry_count = file.read_u64::<LittleEndian>()?;
185        let header_crc = file.read_u32::<LittleEndian>()?;
186
187        let header = Self {
188            magic,
189            version,
190            flags,
191            epoch,
192            writer_id,
193            last_commit_lsn,
194            last_entry_crc,
195            entry_count,
196            header_crc,
197        };
198
199        // Verify CRC
200        let computed_crc = header.compute_crc();
201        if computed_crc != header_crc {
202            return Err(SochDBError::Corruption(format!(
203                "WAL header CRC mismatch: expected {:x}, got {:x}",
204                computed_crc, header_crc
205            )));
206        }
207
208        Ok(header)
209    }
210
211    /// Write header to file
212    pub fn write_to(&self, file: &mut File) -> Result<()> {
213        file.seek(SeekFrom::Start(0))?;
214
215        file.write_u64::<LittleEndian>(self.magic)?;
216        file.write_u16::<LittleEndian>(self.version)?;
217        file.write_u16::<LittleEndian>(self.flags)?;
218        file.write_u64::<LittleEndian>(self.epoch)?;
219        file.write_all(self.writer_id.as_bytes())?;
220        file.write_u64::<LittleEndian>(self.last_commit_lsn)?;
221        file.write_u32::<LittleEndian>(self.last_entry_crc)?;
222        file.write_u64::<LittleEndian>(self.entry_count)?;
223
224        // Compute and write CRC
225        let crc = self.compute_crc();
226        file.write_u32::<LittleEndian>(crc)?;
227
228        // Pad to 64 bytes
229        let written = 8 + 2 + 2 + 8 + 16 + 8 + 4 + 8 + 4; // = 60 bytes
230        let padding = WAL_HEADER_SIZE - written;
231        file.write_all(&vec![0u8; padding])?;
232
233        file.sync_all()?;
234        Ok(())
235    }
236
237    /// Update last entry CRC (called after each write)
238    pub fn update_last_entry_crc(&mut self, crc: u32) {
239        self.last_entry_crc = crc;
240        self.entry_count += 1;
241    }
242
243    /// Update last commit LSN (called after each commit)
244    pub fn update_last_commit(&mut self, lsn: u64) {
245        self.last_commit_lsn = lsn;
246    }
247}
248
249impl Default for WalHeader {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255// =============================================================================
256// Fenced WAL Entry
257// =============================================================================
258
259/// WAL entry with epoch fencing and CRC chain
260///
261/// Each entry contains:
262/// - Its own LSN (sequence number)
263/// - The CRC of the previous entry (chain verification)
264/// - The current epoch (detects interleaved writes)
265/// - The payload data
266/// - Its own CRC
267#[derive(Debug, Clone)]
268pub struct FencedWalEntry {
269    /// Log Sequence Number (position in WAL)
270    pub lsn: u64,
271    /// CRC of the previous entry (0 for first entry)
272    pub prev_crc: u32,
273    /// Epoch when this entry was written
274    pub epoch: u64,
275    /// Payload data
276    pub payload: Vec<u8>,
277    /// CRC of this entry
278    pub crc: u32,
279}
280
281impl FencedWalEntry {
282    /// Entry header size (before payload)
283    const HEADER_SIZE: usize = 8 + 4 + 8 + 4; // lsn + prev_crc + epoch + payload_len
284    /// Entry footer size (after payload)
285    const FOOTER_SIZE: usize = 4; // crc
286
287    /// Create a new fenced entry
288    pub fn new(lsn: u64, prev_crc: u32, epoch: u64, payload: Vec<u8>) -> Self {
289        let mut entry = Self {
290            lsn,
291            prev_crc,
292            epoch,
293            payload,
294            crc: 0,
295        };
296        entry.crc = entry.compute_crc();
297        entry
298    }
299
300    /// Compute CRC for this entry
301    fn compute_crc(&self) -> u32 {
302        let mut hasher = crc32fast::Hasher::new();
303        hasher.update(&self.lsn.to_le_bytes());
304        hasher.update(&self.prev_crc.to_le_bytes());
305        hasher.update(&self.epoch.to_le_bytes());
306        hasher.update(&(self.payload.len() as u32).to_le_bytes());
307        hasher.update(&self.payload);
308        hasher.finalize()
309    }
310
311    /// Serialize entry to bytes
312    pub fn to_bytes(&self) -> Vec<u8> {
313        let total_len = Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE;
314        let mut buf = Vec::with_capacity(total_len);
315
316        buf.extend_from_slice(&self.lsn.to_le_bytes());
317        buf.extend_from_slice(&self.prev_crc.to_le_bytes());
318        buf.extend_from_slice(&self.epoch.to_le_bytes());
319        buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
320        buf.extend_from_slice(&self.payload);
321        buf.extend_from_slice(&self.crc.to_le_bytes());
322
323        buf
324    }
325
326    /// Read entry from reader
327    pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
328        let lsn = reader.read_u64::<LittleEndian>()?;
329        let prev_crc = reader.read_u32::<LittleEndian>()?;
330        let epoch = reader.read_u64::<LittleEndian>()?;
331        let payload_len = reader.read_u32::<LittleEndian>()? as usize;
332
333        let mut payload = vec![0u8; payload_len];
334        reader.read_exact(&mut payload)?;
335
336        let crc = reader.read_u32::<LittleEndian>()?;
337
338        let entry = Self {
339            lsn,
340            prev_crc,
341            epoch,
342            payload,
343            crc,
344        };
345
346        // Verify CRC
347        let computed_crc = entry.compute_crc();
348        if computed_crc != crc {
349            return Err(SochDBError::Corruption(format!(
350                "WAL entry CRC mismatch at LSN {}: expected {:x}, got {:x}",
351                lsn, computed_crc, crc
352            )));
353        }
354
355        Ok(entry)
356    }
357
358    /// Get the size of this entry in bytes
359    pub fn size(&self) -> usize {
360        Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE
361    }
362}
363
364// =============================================================================
365// Fenced WAL Manager
366// =============================================================================
367
368/// WAL manager with epoch fencing for split-brain protection
369///
370/// This is a wrapper around the standard WAL that adds:
371/// - Epoch-based writer fencing
372/// - CRC chain verification
373/// - Split-brain detection during recovery
374pub struct FencedWal {
375    /// Path to WAL file
376    path: PathBuf,
377    /// Current header
378    header: WalHeader,
379    /// File handle
380    file: File,
381    /// Current write position (after header)
382    write_pos: u64,
383}
384
385impl FencedWal {
386    /// Open or create a fenced WAL
387    ///
388    /// If the WAL exists with a different writer_id in the same epoch,
389    /// returns an error indicating split-brain.
390    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
391        let path = path.as_ref().to_path_buf();
392
393        // Ensure parent directory exists
394        if let Some(parent) = path.parent() {
395            std::fs::create_dir_all(parent)?;
396        }
397
398        let file_exists = path.exists();
399        let mut file = OpenOptions::new()
400            .create(true)
401            .read(true)
402            .write(true)
403            .open(&path)?;
404
405        let (header, write_pos) = if file_exists && file.metadata()?.len() >= WAL_HEADER_SIZE as u64
406        {
407            // Read existing header
408            let existing_header = WalHeader::read_from(&mut file)?;
409
410            // Create new epoch header
411            let new_header = WalHeader::new_epoch(&existing_header);
412            new_header.write_to(&mut file)?;
413
414            // Find write position by scanning to end
415            let write_pos = Self::find_write_position(&mut file, &existing_header)?;
416
417            (new_header, write_pos)
418        } else {
419            // Fresh WAL
420            let header = WalHeader::new();
421            header.write_to(&mut file)?;
422            (header, WAL_HEADER_SIZE as u64)
423        };
424
425        Ok(Self {
426            path,
427            header,
428            file,
429            write_pos,
430        })
431    }
432
433    /// Find the position after the last valid entry
434    fn find_write_position(file: &mut File, header: &WalHeader) -> Result<u64> {
435        file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
436
437        let mut pos = WAL_HEADER_SIZE as u64;
438        let mut prev_crc = 0u32;
439        let mut entries_verified = 0u64;
440
441        loop {
442            match FencedWalEntry::read_from(file) {
443                Ok(entry) => {
444                    // Verify chain integrity
445                    if entries_verified > 0 && entry.prev_crc != prev_crc {
446                        // Chain broken - truncate here
447                        eprintln!(
448                            "WAL chain broken at LSN {}: expected prev_crc {:x}, got {:x}",
449                            entry.lsn, prev_crc, entry.prev_crc
450                        );
451                        break;
452                    }
453
454                    // Verify epoch
455                    if entry.epoch > header.epoch {
456                        // Future epoch - corruption or split-brain
457                        return Err(SochDBError::SplitBrain(format!(
458                            "Entry has future epoch {} > header epoch {}",
459                            entry.epoch, header.epoch
460                        )));
461                    }
462
463                    prev_crc = entry.crc;
464                    pos += entry.size() as u64;
465                    entries_verified += 1;
466                }
467                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
468                    break;
469                }
470                Err(SochDBError::Corruption(_)) => {
471                    // Corrupted entry - truncate here
472                    break;
473                }
474                Err(e) => return Err(e),
475            }
476        }
477
478        // Truncate any garbage at the end
479        file.set_len(pos)?;
480        file.seek(SeekFrom::Start(pos))?;
481
482        Ok(pos)
483    }
484
485    /// Append an entry with epoch fencing
486    pub fn append(&mut self, payload: Vec<u8>) -> Result<u64> {
487        let lsn = self.header.entry_count + 1;
488        let entry = FencedWalEntry::new(
489            lsn,
490            self.header.last_entry_crc,
491            self.header.epoch,
492            payload,
493        );
494
495        let bytes = entry.to_bytes();
496        self.file.seek(SeekFrom::Start(self.write_pos))?;
497        self.file.write_all(&bytes)?;
498
499        self.write_pos += bytes.len() as u64;
500        self.header.update_last_entry_crc(entry.crc);
501
502        Ok(lsn)
503    }
504
505    /// Sync to disk and update header
506    pub fn sync(&mut self) -> Result<()> {
507        self.file.sync_all()?;
508        self.header.write_to(&mut self.file)?;
509        Ok(())
510    }
511
512    /// Mark a commit point
513    pub fn commit(&mut self, lsn: u64) -> Result<()> {
514        self.header.update_last_commit(lsn);
515        self.sync()
516    }
517
518    /// Get current epoch
519    pub fn epoch(&self) -> u64 {
520        self.header.epoch
521    }
522
523    /// Get current writer ID
524    pub fn writer_id(&self) -> Uuid {
525        self.header.writer_id
526    }
527
528    /// Get last committed LSN
529    pub fn last_commit_lsn(&self) -> u64 {
530        self.header.last_commit_lsn
531    }
532
533    /// Get entry count
534    pub fn entry_count(&self) -> u64 {
535        self.header.entry_count
536    }
537
538    /// Replay all entries, verifying chain integrity
539    pub fn replay<F>(&mut self, mut callback: F) -> Result<u64>
540    where
541        F: FnMut(&FencedWalEntry) -> Result<()>,
542    {
543        self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
544
545        let mut prev_crc = 0u32;
546        let mut count = 0u64;
547
548        loop {
549            match FencedWalEntry::read_from(&mut self.file) {
550                Ok(entry) => {
551                    // Verify chain
552                    if count > 0 && entry.prev_crc != prev_crc {
553                        return Err(SochDBError::Corruption(format!(
554                            "Chain broken at LSN {}: expected {:x}, got {:x}",
555                            entry.lsn, prev_crc, entry.prev_crc
556                        )));
557                    }
558
559                    callback(&entry)?;
560                    prev_crc = entry.crc;
561                    count += 1;
562                }
563                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
564                    break;
565                }
566                Err(e) => return Err(e),
567            }
568        }
569
570        Ok(count)
571    }
572
573    /// Replay only committed entries
574    pub fn replay_committed<F>(&mut self, callback: F) -> Result<u64>
575    where
576        F: FnMut(&FencedWalEntry) -> Result<()>,
577    {
578        let commit_lsn = self.header.last_commit_lsn;
579        let mut wrapped_callback = callback;
580        let mut committed_count = 0u64;
581
582        self.replay(|entry| {
583            if entry.lsn <= commit_lsn {
584                wrapped_callback(entry)?;
585                committed_count += 1;
586            }
587            Ok(())
588        })?;
589
590        Ok(committed_count)
591    }
592}
593
594// =============================================================================
595// Tests
596// =============================================================================
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use tempfile::TempDir;
602
603    #[test]
604    fn test_header_roundtrip() {
605        let dir = TempDir::new().unwrap();
606        let path = dir.path().join("test.wal");
607
608        let header = WalHeader::new();
609
610        {
611            let mut file = File::create(&path).unwrap();
612            header.write_to(&mut file).unwrap();
613        }
614
615        {
616            let mut file = File::open(&path).unwrap();
617            let read_header = WalHeader::read_from(&mut file).unwrap();
618            assert_eq!(read_header.magic, header.magic);
619            assert_eq!(read_header.epoch, header.epoch);
620            assert_eq!(read_header.writer_id, header.writer_id);
621        }
622    }
623
624    #[test]
625    fn test_epoch_increment() {
626        let dir = TempDir::new().unwrap();
627        let path = dir.path().join("test.wal");
628
629        // First writer
630        let wal1 = FencedWal::open(&path).unwrap();
631        let epoch1 = wal1.epoch();
632        let writer1 = wal1.writer_id();
633        drop(wal1);
634
635        // Second writer should have new epoch
636        let wal2 = FencedWal::open(&path).unwrap();
637        let epoch2 = wal2.epoch();
638        let writer2 = wal2.writer_id();
639
640        assert_eq!(epoch2, epoch1 + 1);
641        assert_ne!(writer1, writer2);
642    }
643
644    #[test]
645    fn test_entry_chain() {
646        let dir = TempDir::new().unwrap();
647        let path = dir.path().join("test.wal");
648
649        let mut wal = FencedWal::open(&path).unwrap();
650
651        // Write entries
652        wal.append(b"entry1".to_vec()).unwrap();
653        wal.append(b"entry2".to_vec()).unwrap();
654        wal.append(b"entry3".to_vec()).unwrap();
655        wal.sync().unwrap();
656
657        // Replay and verify
658        let mut entries = Vec::new();
659        wal.replay(|entry| {
660            entries.push(entry.payload.clone());
661            Ok(())
662        })
663        .unwrap();
664
665        assert_eq!(entries.len(), 3);
666        assert_eq!(entries[0], b"entry1");
667        assert_eq!(entries[1], b"entry2");
668        assert_eq!(entries[2], b"entry3");
669    }
670
671    #[test]
672    fn test_commit_replay() {
673        let dir = TempDir::new().unwrap();
674        let path = dir.path().join("test.wal");
675
676        {
677            let mut wal = FencedWal::open(&path).unwrap();
678            wal.append(b"committed1".to_vec()).unwrap();
679            wal.append(b"committed2".to_vec()).unwrap();
680            wal.commit(2).unwrap();
681            wal.append(b"uncommitted".to_vec()).unwrap();
682            wal.sync().unwrap();
683        }
684
685        // Reopen and replay committed only
686        let mut wal = FencedWal::open(&path).unwrap();
687        let mut entries = Vec::new();
688        wal.replay_committed(|entry| {
689            entries.push(entry.payload.clone());
690            Ok(())
691        })
692        .unwrap();
693
694        assert_eq!(entries.len(), 2);
695        assert_eq!(entries[0], b"committed1");
696        assert_eq!(entries[1], b"committed2");
697    }
698}