sochdb_storage/
wal_fencing.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! WAL Epoch Fencing for Split-Brain Detection
16//!
17//! This module implements epoch-based fencing to detect concurrent writers
18//! and enable safe recovery from split-brain scenarios.
19//!
20//! ## Problem
21//!
22//! When multiple processes write to the same WAL (even with interleaved appends
23//! that don't physically corrupt bytes), the sequence numbers become meaningless:
24//!
25//! 1. Each process maintains an independent counter
26//! 2. Gaps in sequence numbers indicate lost writes
27//! 3. Duplicate sequences indicate split-brain
28//!
29//! ## Solution
30//!
31//! Store a fencing header in the first 64 bytes of the WAL file:
32//!
33//! ```text
34//! ┌─────────────┬────────────────┬─────────────┬──────────────┬────────────┐
35//! │ magic (8B)  │ epoch (8B)     │ writer_id   │ last_commit  │ header_crc │
36//! │             │                │ (16B UUID)  │ _lsn (8B)    │ (8B)       │
37//! └─────────────┴────────────────┴─────────────┴──────────────┴────────────┘
38//! ```
39//!
40//! ## Algorithm
41//!
42//! On WAL.open():
43//!   - Read header
44//!   - If header.writer_id ≠ my_uuid AND header.epoch == current_epoch:
45//!       Another writer is active or crashed → Error::SplitBrainDetected
46//!   - Increment epoch, write new header with fsync
47//!
48//! ## Chain Integrity
49//!
50//! Each WAL entry stores the CRC of the previous entry:
51//!
52//! ```text
53//! ┌──────────┬───────────────┬──────────────┬─────────┬──────────┐
54//! │ entry_lsn│ prev_entry_crc│ epoch        │ payload │ entry_crc│
55//! └──────────┴───────────────┴──────────────┴─────────┴──────────┘
56//! ```
57//!
58//! During recovery:
59//! - Verify: entry[i].prev_crc == computed_crc(entry[i-1])
60//! - Break in chain indicates corruption or split-brain interleaving
61//! - Truncate WAL at first chain break
62
63use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
64use std::fs::{File, OpenOptions};
65use std::io::{Read, Seek, SeekFrom, Write};
66use std::path::{Path, PathBuf};
67use uuid::Uuid;
68
69use sochdb_core::{Result, SochDBError};
70
71// =============================================================================
72// Constants
73// =============================================================================
74
75/// Magic number identifying SochDB WAL files
76const WAL_MAGIC: u64 = 0x534F43_48444257; // "SOCHDBW" as hex
77
78/// WAL header size in bytes
79pub const WAL_HEADER_SIZE: usize = 64;
80
81/// Version of the WAL format
82const WAL_VERSION: u16 = 1;
83
84// =============================================================================
85// WAL Header
86// =============================================================================
87
88/// WAL file header with epoch fencing
89///
90/// This header is stored in the first 64 bytes of every WAL file.
91/// It enables detection of concurrent writers and safe recovery.
92#[derive(Debug, Clone)]
93pub struct WalHeader {
94    /// Magic number (0x534F4348444257 = "SOCHDBW")
95    pub magic: u64,
96    /// WAL format version
97    pub version: u16,
98    /// Reserved flags
99    pub flags: u16,
100    /// Epoch counter - incremented on each writer open
101    pub epoch: u64,
102    /// UUID of the current writer
103    pub writer_id: Uuid,
104    /// LSN of the last committed transaction
105    pub last_commit_lsn: u64,
106    /// CRC of the last entry (for chain verification)
107    pub last_entry_crc: u32,
108    /// Number of entries written in current epoch
109    pub entry_count: u64,
110    /// CRC32 of the header itself
111    pub header_crc: u32,
112}
113
114impl WalHeader {
115    /// Create a new header for a fresh WAL
116    pub fn new() -> Self {
117        Self {
118            magic: WAL_MAGIC,
119            version: WAL_VERSION,
120            flags: 0,
121            epoch: 1,
122            writer_id: Uuid::new_v4(),
123            last_commit_lsn: 0,
124            last_entry_crc: 0,
125            entry_count: 0,
126            header_crc: 0,
127        }
128    }
129
130    /// Create header for a new epoch (when taking over from previous writer)
131    pub fn new_epoch(previous: &WalHeader) -> Self {
132        Self {
133            magic: WAL_MAGIC,
134            version: WAL_VERSION,
135            flags: 0,
136            epoch: previous.epoch + 1,
137            writer_id: Uuid::new_v4(),
138            last_commit_lsn: previous.last_commit_lsn,
139            last_entry_crc: previous.last_entry_crc,
140            entry_count: 0,
141            header_crc: 0,
142        }
143    }
144
145    /// Compute CRC for this header
146    fn compute_crc(&self) -> u32 {
147        let mut hasher = crc32fast::Hasher::new();
148        hasher.update(&self.magic.to_le_bytes());
149        hasher.update(&self.version.to_le_bytes());
150        hasher.update(&self.flags.to_le_bytes());
151        hasher.update(&self.epoch.to_le_bytes());
152        hasher.update(self.writer_id.as_bytes());
153        hasher.update(&self.last_commit_lsn.to_le_bytes());
154        hasher.update(&self.last_entry_crc.to_le_bytes());
155        hasher.update(&self.entry_count.to_le_bytes());
156        hasher.finalize()
157    }
158
159    /// Read header from file
160    pub fn read_from(file: &mut File) -> Result<Self> {
161        file.seek(SeekFrom::Start(0))?;
162
163        let magic = file.read_u64::<LittleEndian>()?;
164        if magic != WAL_MAGIC {
165            return Err(SochDBError::Corruption(format!(
166                "Invalid WAL magic: expected {:x}, got {:x}",
167                WAL_MAGIC, magic
168            )));
169        }
170
171        let version = file.read_u16::<LittleEndian>()?;
172        let flags = file.read_u16::<LittleEndian>()?;
173        let epoch = file.read_u64::<LittleEndian>()?;
174
175        let mut writer_id_bytes = [0u8; 16];
176        file.read_exact(&mut writer_id_bytes)?;
177        let writer_id = Uuid::from_bytes(writer_id_bytes);
178
179        let last_commit_lsn = file.read_u64::<LittleEndian>()?;
180        let last_entry_crc = file.read_u32::<LittleEndian>()?;
181        let entry_count = file.read_u64::<LittleEndian>()?;
182        let header_crc = file.read_u32::<LittleEndian>()?;
183
184        let header = Self {
185            magic,
186            version,
187            flags,
188            epoch,
189            writer_id,
190            last_commit_lsn,
191            last_entry_crc,
192            entry_count,
193            header_crc,
194        };
195
196        // Verify CRC
197        let computed_crc = header.compute_crc();
198        if computed_crc != header_crc {
199            return Err(SochDBError::Corruption(format!(
200                "WAL header CRC mismatch: expected {:x}, got {:x}",
201                computed_crc, header_crc
202            )));
203        }
204
205        Ok(header)
206    }
207
208    /// Write header to file
209    pub fn write_to(&self, file: &mut File) -> Result<()> {
210        file.seek(SeekFrom::Start(0))?;
211
212        file.write_u64::<LittleEndian>(self.magic)?;
213        file.write_u16::<LittleEndian>(self.version)?;
214        file.write_u16::<LittleEndian>(self.flags)?;
215        file.write_u64::<LittleEndian>(self.epoch)?;
216        file.write_all(self.writer_id.as_bytes())?;
217        file.write_u64::<LittleEndian>(self.last_commit_lsn)?;
218        file.write_u32::<LittleEndian>(self.last_entry_crc)?;
219        file.write_u64::<LittleEndian>(self.entry_count)?;
220
221        // Compute and write CRC
222        let crc = self.compute_crc();
223        file.write_u32::<LittleEndian>(crc)?;
224
225        // Pad to 64 bytes
226        let written = 8 + 2 + 2 + 8 + 16 + 8 + 4 + 8 + 4; // = 60 bytes
227        let padding = WAL_HEADER_SIZE - written;
228        file.write_all(&vec![0u8; padding])?;
229
230        file.sync_all()?;
231        Ok(())
232    }
233
234    /// Update last entry CRC (called after each write)
235    pub fn update_last_entry_crc(&mut self, crc: u32) {
236        self.last_entry_crc = crc;
237        self.entry_count += 1;
238    }
239
240    /// Update last commit LSN (called after each commit)
241    pub fn update_last_commit(&mut self, lsn: u64) {
242        self.last_commit_lsn = lsn;
243    }
244}
245
246impl Default for WalHeader {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252// =============================================================================
253// Fenced WAL Entry
254// =============================================================================
255
256/// WAL entry with epoch fencing and CRC chain
257///
258/// Each entry contains:
259/// - Its own LSN (sequence number)
260/// - The CRC of the previous entry (chain verification)
261/// - The current epoch (detects interleaved writes)
262/// - The payload data
263/// - Its own CRC
264#[derive(Debug, Clone)]
265pub struct FencedWalEntry {
266    /// Log Sequence Number (position in WAL)
267    pub lsn: u64,
268    /// CRC of the previous entry (0 for first entry)
269    pub prev_crc: u32,
270    /// Epoch when this entry was written
271    pub epoch: u64,
272    /// Payload data
273    pub payload: Vec<u8>,
274    /// CRC of this entry
275    pub crc: u32,
276}
277
278impl FencedWalEntry {
279    /// Entry header size (before payload)
280    const HEADER_SIZE: usize = 8 + 4 + 8 + 4; // lsn + prev_crc + epoch + payload_len
281    /// Entry footer size (after payload)
282    const FOOTER_SIZE: usize = 4; // crc
283
284    /// Create a new fenced entry
285    pub fn new(lsn: u64, prev_crc: u32, epoch: u64, payload: Vec<u8>) -> Self {
286        let mut entry = Self {
287            lsn,
288            prev_crc,
289            epoch,
290            payload,
291            crc: 0,
292        };
293        entry.crc = entry.compute_crc();
294        entry
295    }
296
297    /// Compute CRC for this entry
298    fn compute_crc(&self) -> u32 {
299        let mut hasher = crc32fast::Hasher::new();
300        hasher.update(&self.lsn.to_le_bytes());
301        hasher.update(&self.prev_crc.to_le_bytes());
302        hasher.update(&self.epoch.to_le_bytes());
303        hasher.update(&(self.payload.len() as u32).to_le_bytes());
304        hasher.update(&self.payload);
305        hasher.finalize()
306    }
307
308    /// Serialize entry to bytes
309    pub fn to_bytes(&self) -> Vec<u8> {
310        let total_len = Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE;
311        let mut buf = Vec::with_capacity(total_len);
312
313        buf.extend_from_slice(&self.lsn.to_le_bytes());
314        buf.extend_from_slice(&self.prev_crc.to_le_bytes());
315        buf.extend_from_slice(&self.epoch.to_le_bytes());
316        buf.extend_from_slice(&(self.payload.len() as u32).to_le_bytes());
317        buf.extend_from_slice(&self.payload);
318        buf.extend_from_slice(&self.crc.to_le_bytes());
319
320        buf
321    }
322
323    /// Read entry from reader
324    pub fn read_from<R: Read>(reader: &mut R) -> Result<Self> {
325        let lsn = reader.read_u64::<LittleEndian>()?;
326        let prev_crc = reader.read_u32::<LittleEndian>()?;
327        let epoch = reader.read_u64::<LittleEndian>()?;
328        let payload_len = reader.read_u32::<LittleEndian>()? as usize;
329
330        let mut payload = vec![0u8; payload_len];
331        reader.read_exact(&mut payload)?;
332
333        let crc = reader.read_u32::<LittleEndian>()?;
334
335        let entry = Self {
336            lsn,
337            prev_crc,
338            epoch,
339            payload,
340            crc,
341        };
342
343        // Verify CRC
344        let computed_crc = entry.compute_crc();
345        if computed_crc != crc {
346            return Err(SochDBError::Corruption(format!(
347                "WAL entry CRC mismatch at LSN {}: expected {:x}, got {:x}",
348                lsn, computed_crc, crc
349            )));
350        }
351
352        Ok(entry)
353    }
354
355    /// Get the size of this entry in bytes
356    pub fn size(&self) -> usize {
357        Self::HEADER_SIZE + self.payload.len() + Self::FOOTER_SIZE
358    }
359}
360
361// =============================================================================
362// Fenced WAL Manager
363// =============================================================================
364
365/// WAL manager with epoch fencing for split-brain protection
366///
367/// This is a wrapper around the standard WAL that adds:
368/// - Epoch-based writer fencing
369/// - CRC chain verification
370/// - Split-brain detection during recovery
371pub struct FencedWal {
372    /// Path to WAL file
373    path: PathBuf,
374    /// Current header
375    header: WalHeader,
376    /// File handle
377    file: File,
378    /// Current write position (after header)
379    write_pos: u64,
380}
381
382impl FencedWal {
383    /// Open or create a fenced WAL
384    ///
385    /// If the WAL exists with a different writer_id in the same epoch,
386    /// returns an error indicating split-brain.
387    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
388        let path = path.as_ref().to_path_buf();
389
390        // Ensure parent directory exists
391        if let Some(parent) = path.parent() {
392            std::fs::create_dir_all(parent)?;
393        }
394
395        let file_exists = path.exists();
396        let mut file = OpenOptions::new()
397            .create(true)
398            .read(true)
399            .write(true)
400            .open(&path)?;
401
402        let (header, write_pos) = if file_exists && file.metadata()?.len() >= WAL_HEADER_SIZE as u64
403        {
404            // Read existing header
405            let existing_header = WalHeader::read_from(&mut file)?;
406
407            // Create new epoch header
408            let new_header = WalHeader::new_epoch(&existing_header);
409            new_header.write_to(&mut file)?;
410
411            // Find write position by scanning to end
412            let write_pos = Self::find_write_position(&mut file, &existing_header)?;
413
414            (new_header, write_pos)
415        } else {
416            // Fresh WAL
417            let header = WalHeader::new();
418            header.write_to(&mut file)?;
419            (header, WAL_HEADER_SIZE as u64)
420        };
421
422        Ok(Self {
423            path,
424            header,
425            file,
426            write_pos,
427        })
428    }
429
430    /// Find the position after the last valid entry
431    fn find_write_position(file: &mut File, header: &WalHeader) -> Result<u64> {
432        file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
433
434        let mut pos = WAL_HEADER_SIZE as u64;
435        let mut prev_crc = 0u32;
436        let mut entries_verified = 0u64;
437
438        loop {
439            match FencedWalEntry::read_from(file) {
440                Ok(entry) => {
441                    // Verify chain integrity
442                    if entries_verified > 0 && entry.prev_crc != prev_crc {
443                        // Chain broken - truncate here
444                        eprintln!(
445                            "WAL chain broken at LSN {}: expected prev_crc {:x}, got {:x}",
446                            entry.lsn, prev_crc, entry.prev_crc
447                        );
448                        break;
449                    }
450
451                    // Verify epoch
452                    if entry.epoch > header.epoch {
453                        // Future epoch - corruption or split-brain
454                        return Err(SochDBError::SplitBrain(format!(
455                            "Entry has future epoch {} > header epoch {}",
456                            entry.epoch, header.epoch
457                        )));
458                    }
459
460                    prev_crc = entry.crc;
461                    pos += entry.size() as u64;
462                    entries_verified += 1;
463                }
464                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
465                    break;
466                }
467                Err(SochDBError::Corruption(_)) => {
468                    // Corrupted entry - truncate here
469                    break;
470                }
471                Err(e) => return Err(e),
472            }
473        }
474
475        // Truncate any garbage at the end
476        file.set_len(pos)?;
477        file.seek(SeekFrom::Start(pos))?;
478
479        Ok(pos)
480    }
481
482    /// Append an entry with epoch fencing
483    pub fn append(&mut self, payload: Vec<u8>) -> Result<u64> {
484        let lsn = self.header.entry_count + 1;
485        let entry = FencedWalEntry::new(
486            lsn,
487            self.header.last_entry_crc,
488            self.header.epoch,
489            payload,
490        );
491
492        let bytes = entry.to_bytes();
493        self.file.seek(SeekFrom::Start(self.write_pos))?;
494        self.file.write_all(&bytes)?;
495
496        self.write_pos += bytes.len() as u64;
497        self.header.update_last_entry_crc(entry.crc);
498
499        Ok(lsn)
500    }
501
502    /// Sync to disk and update header
503    pub fn sync(&mut self) -> Result<()> {
504        self.file.sync_all()?;
505        self.header.write_to(&mut self.file)?;
506        Ok(())
507    }
508
509    /// Mark a commit point
510    pub fn commit(&mut self, lsn: u64) -> Result<()> {
511        self.header.update_last_commit(lsn);
512        self.sync()
513    }
514
515    /// Get current epoch
516    pub fn epoch(&self) -> u64 {
517        self.header.epoch
518    }
519
520    /// Get current writer ID
521    pub fn writer_id(&self) -> Uuid {
522        self.header.writer_id
523    }
524
525    /// Get last committed LSN
526    pub fn last_commit_lsn(&self) -> u64 {
527        self.header.last_commit_lsn
528    }
529
530    /// Get entry count
531    pub fn entry_count(&self) -> u64 {
532        self.header.entry_count
533    }
534
535    /// Replay all entries, verifying chain integrity
536    pub fn replay<F>(&mut self, mut callback: F) -> Result<u64>
537    where
538        F: FnMut(&FencedWalEntry) -> Result<()>,
539    {
540        self.file.seek(SeekFrom::Start(WAL_HEADER_SIZE as u64))?;
541
542        let mut prev_crc = 0u32;
543        let mut count = 0u64;
544
545        loop {
546            match FencedWalEntry::read_from(&mut self.file) {
547                Ok(entry) => {
548                    // Verify chain
549                    if count > 0 && entry.prev_crc != prev_crc {
550                        return Err(SochDBError::Corruption(format!(
551                            "Chain broken at LSN {}: expected {:x}, got {:x}",
552                            entry.lsn, prev_crc, entry.prev_crc
553                        )));
554                    }
555
556                    callback(&entry)?;
557                    prev_crc = entry.crc;
558                    count += 1;
559                }
560                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
561                    break;
562                }
563                Err(e) => return Err(e),
564            }
565        }
566
567        Ok(count)
568    }
569
570    /// Replay only committed entries
571    pub fn replay_committed<F>(&mut self, callback: F) -> Result<u64>
572    where
573        F: FnMut(&FencedWalEntry) -> Result<()>,
574    {
575        let commit_lsn = self.header.last_commit_lsn;
576        let mut wrapped_callback = callback;
577        let mut committed_count = 0u64;
578
579        self.replay(|entry| {
580            if entry.lsn <= commit_lsn {
581                wrapped_callback(entry)?;
582                committed_count += 1;
583            }
584            Ok(())
585        })?;
586
587        Ok(committed_count)
588    }
589}
590
591// =============================================================================
592// Tests
593// =============================================================================
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use tempfile::TempDir;
599
600    #[test]
601    fn test_header_roundtrip() {
602        let dir = TempDir::new().unwrap();
603        let path = dir.path().join("test.wal");
604
605        let header = WalHeader::new();
606
607        {
608            let mut file = File::create(&path).unwrap();
609            header.write_to(&mut file).unwrap();
610        }
611
612        {
613            let mut file = File::open(&path).unwrap();
614            let read_header = WalHeader::read_from(&mut file).unwrap();
615            assert_eq!(read_header.magic, header.magic);
616            assert_eq!(read_header.epoch, header.epoch);
617            assert_eq!(read_header.writer_id, header.writer_id);
618        }
619    }
620
621    #[test]
622    fn test_epoch_increment() {
623        let dir = TempDir::new().unwrap();
624        let path = dir.path().join("test.wal");
625
626        // First writer
627        let wal1 = FencedWal::open(&path).unwrap();
628        let epoch1 = wal1.epoch();
629        let writer1 = wal1.writer_id();
630        drop(wal1);
631
632        // Second writer should have new epoch
633        let wal2 = FencedWal::open(&path).unwrap();
634        let epoch2 = wal2.epoch();
635        let writer2 = wal2.writer_id();
636
637        assert_eq!(epoch2, epoch1 + 1);
638        assert_ne!(writer1, writer2);
639    }
640
641    #[test]
642    fn test_entry_chain() {
643        let dir = TempDir::new().unwrap();
644        let path = dir.path().join("test.wal");
645
646        let mut wal = FencedWal::open(&path).unwrap();
647
648        // Write entries
649        wal.append(b"entry1".to_vec()).unwrap();
650        wal.append(b"entry2".to_vec()).unwrap();
651        wal.append(b"entry3".to_vec()).unwrap();
652        wal.sync().unwrap();
653
654        // Replay and verify
655        let mut entries = Vec::new();
656        wal.replay(|entry| {
657            entries.push(entry.payload.clone());
658            Ok(())
659        })
660        .unwrap();
661
662        assert_eq!(entries.len(), 3);
663        assert_eq!(entries[0], b"entry1");
664        assert_eq!(entries[1], b"entry2");
665        assert_eq!(entries[2], b"entry3");
666    }
667
668    #[test]
669    fn test_commit_replay() {
670        let dir = TempDir::new().unwrap();
671        let path = dir.path().join("test.wal");
672
673        {
674            let mut wal = FencedWal::open(&path).unwrap();
675            wal.append(b"committed1".to_vec()).unwrap();
676            wal.append(b"committed2".to_vec()).unwrap();
677            wal.commit(2).unwrap();
678            wal.append(b"uncommitted".to_vec()).unwrap();
679            wal.sync().unwrap();
680        }
681
682        // Reopen and replay committed only
683        let mut wal = FencedWal::open(&path).unwrap();
684        let mut entries = Vec::new();
685        wal.replay_committed(|entry| {
686            entries.push(entry.payload.clone());
687            Ok(())
688        })
689        .unwrap();
690
691        assert_eq!(entries.len(), 2);
692        assert_eq!(entries[0], b"committed1");
693        assert_eq!(entries[1], b"committed2");
694    }
695}