Skip to main content

sochdb_storage/
txn_wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1)  │ TxnId(8)│ Timestamp │ KeyLen  │ ValueLen  │ Key(*)  │ Value(*)│ CRC32(4) │
40//! │           │          │         │   (8)     │  (4)    │   (4)     │         │         │          │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use crate::encryption::EncryptionEngine;
52use byteorder::{LittleEndian, ReadBytesExt};
53use parking_lot::Mutex;
54use sochdb_core::{Result, SochDBError, WalRecordType};
55use std::cell::Cell;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::io::{BufReader, BufWriter, Read, Write};
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::time::{Instant, SystemTime, UNIX_EPOCH};
63
64// =============================================================================
65// Coarse-Grained Timestamp Caching (Recommendation 5)
66// =============================================================================
67
68/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
69const CACHE_VALIDITY_NS: u64 = 1_000_000;
70
71thread_local! {
72    /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
73    /// Avoids syscall overhead by caching timestamp for ~1ms windows
74    static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
75}
76
77/// Get cached timestamp in microseconds
78///
79/// This function eliminates per-write `SystemTime::now()` syscalls by:
80/// 1. Caching the wall-clock timestamp in thread-local storage
81/// 2. Using monotonic `Instant` for sub-millisecond offsets
82/// 3. Only refreshing from syscall when cache expires (every ~1ms)
83///
84/// ## Performance Impact
85///
86/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
87/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
88///
89/// ## Monotonicity Guarantee
90///
91/// Timestamps are guaranteed to be monotonically increasing within a thread
92/// due to the `elapsed` offset added to the cached base timestamp.
93#[inline(always)]
94pub fn cached_timestamp_us() -> u64 {
95    TS_CACHE.with(|cache| {
96        let (instant, ts) = cache.get();
97        let elapsed_ns = instant.elapsed().as_nanos() as u64;
98
99        if elapsed_ns < CACHE_VALIDITY_NS {
100            // Fast path: return cached + monotonic offset
101            // This is pure arithmetic, no syscall
102            ts + elapsed_ns / 1000
103        } else {
104            // Slow path: refresh cache from syscall
105            let new_ts = SystemTime::now()
106                .duration_since(UNIX_EPOCH)
107                .expect("system clock set before UNIX epoch (1970-01-01)")
108                .as_micros() as u64;
109            cache.set((Instant::now(), new_ts));
110            new_ts
111        }
112    })
113}
114
115/// Header size in bytes (without key/value data)
116const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
117
118/// Checksum size (CRC32)
119const CHECKSUM_SIZE: usize = 4;
120
121/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
122const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
123
124// =============================================================================
125// At-rest encryption framing (Task 3B)
126// =============================================================================
127//
128// When the WAL's `EncryptionEngine` is enabled, each record is written as an
129// encrypted frame instead of the plaintext frame above:
130//
131// ```text
132//   plaintext frame:  [content_len:u32][type|txn|ts|klen|vlen|key|val|crc32]
133//   encrypted frame:  [outer_len:u32  ][ver|nonce(12)|ciphertext+tag(16)]
134// ```
135//
136// `outer_len` is the length of the AEAD envelope (ciphertext.len()); the inner
137// plaintext that the envelope protects is the record BODY — exactly the bytes
138// after `content_len` in the plaintext frame (`type..crc32`). On decrypt the
139// body is parsed by [`TxnWalEntry::parse_body`], reusing the same field layout +
140// CRC check as the plaintext path.
141//
142// The per-record AAD binds {format_version, db_uuid, dek_epoch, file-relative
143// record ordinal} so a record cannot be reordered, duplicated, spliced from
144// another DB, or read under a downgraded format without failing authentication.
145// The reader reconstructs the identical AAD from its own trusted state (the
146// keyring's db_uuid/epoch and a 0-based counter of records read from this file),
147// NEVER from the attacker-controllable on-disk bytes.
148
149/// Encode a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) — the bytes the
150/// AEAD envelope protects, identical to `to_bytes()` minus the length prefix.
151/// Used by the encrypted write paths to materialize a record before sealing it.
152fn encode_record_body(
153    record_type: WalRecordType,
154    txn_id: u64,
155    timestamp_us: u64,
156    key: &[u8],
157    value: &[u8],
158) -> Vec<u8> {
159    let body_len = (RECORD_HEADER_SIZE - 4) + key.len() + value.len() + CHECKSUM_SIZE;
160    let mut body = Vec::with_capacity(body_len);
161    let mut hasher = crc32fast::Hasher::new();
162    let rt = record_type as u8;
163    body.push(rt);
164    hasher.update(&[rt]);
165    let t = txn_id.to_le_bytes();
166    body.extend_from_slice(&t);
167    hasher.update(&t);
168    let ts = timestamp_us.to_le_bytes();
169    body.extend_from_slice(&ts);
170    hasher.update(&ts);
171    let kl = (key.len() as u32).to_le_bytes();
172    body.extend_from_slice(&kl);
173    hasher.update(&kl);
174    let vl = (value.len() as u32).to_le_bytes();
175    body.extend_from_slice(&vl);
176    hasher.update(&vl);
177    body.extend_from_slice(key);
178    hasher.update(key);
179    body.extend_from_slice(value);
180    hasher.update(value);
181    body.extend_from_slice(&hasher.finalize().to_le_bytes());
182    body
183}
184
185/// AAD layout version for the WAL record binding. Part of the on-disk contract.
186const WAL_AAD_VERSION: u8 = 1;
187/// AAD length: version(1) + db_uuid(16) + dek_epoch(4) + record_ordinal(8).
188const WAL_AAD_LEN: usize = 1 + 16 + 4 + 8;
189/// Upper bound on a single WAL frame's on-disk length, to reject a corrupted
190/// length prefix before it triggers a multi-GB `read_exact` allocation/DoS.
191const MAX_WAL_FRAME_LEN: u32 = 512 * 1024 * 1024;
192
193// =============================================================================
194// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
195// =============================================================================
196
197/// Transaction-local WAL write buffer
198///
199/// Collects all writes during a transaction in memory, then flushes
200/// everything with a SINGLE lock acquisition at commit time.
201///
202/// ## Performance Impact
203///
204/// ```text
205/// Without TxnWalBuffer (current):
206///   1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
207///   Lock overhead: 1000 × 20ns = 20µs per transaction
208///
209/// With TxnWalBuffer:
210///   1000 writes buffered locally (NO LOCK)
211///   1 flush at commit (SINGLE LOCK)
212///   Lock overhead: 1 × 20ns = 20ns per transaction
213///   Speedup: 1000× for lock overhead
214/// ```
215///
216/// ## Usage
217///
218/// ```ignore
219/// let mut buffer = TxnWalBuffer::new(txn_id);
220///
221/// // These do NOT acquire any locks
222/// buffer.append(b"key1", b"value1");
223/// buffer.append(b"key2", b"value2");
224/// // ... hundreds of writes ...
225///
226/// // Single lock acquisition, single write syscall
227/// buffer.flush(&wal)?;
228/// ```
229#[derive(Debug)]
230pub struct TxnWalBuffer {
231    /// Transaction ID for all entries
232    txn_id: u64,
233    /// Accumulated serialized entries
234    buffer: Vec<u8>,
235    /// Number of entries buffered
236    entry_count: usize,
237}
238
239impl TxnWalBuffer {
240    /// Create a new buffer for a transaction
241    #[inline]
242    pub fn new(txn_id: u64) -> Self {
243        Self {
244            txn_id,
245            buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
246            entry_count: 0,
247        }
248    }
249
250    /// Create with specific capacity
251    #[inline]
252    pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
253        Self {
254            txn_id,
255            buffer: Vec::with_capacity(capacity),
256            entry_count: 0,
257        }
258    }
259
260    /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
261    ///
262    /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
263    /// This is completely lock-free and does not touch the file system.
264    ///
265    /// Uses cached timestamps to eliminate per-write syscall overhead.
266    #[inline]
267    pub fn append(&mut self, key: &[u8], value: &[u8]) {
268        // Use cached timestamp instead of syscall (Recommendation 5)
269        let timestamp_us = cached_timestamp_us();
270
271        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
272        let entry_start = self.buffer.len();
273
274        // Reserve space for length prefix (will fill at end)
275        self.buffer.extend_from_slice(&[0u8; 4]);
276
277        let mut hasher = crc32fast::Hasher::new();
278
279        // Record type (Data = 0)
280        let record_type_byte = WalRecordType::Data as u8;
281        self.buffer.push(record_type_byte);
282        hasher.update(&[record_type_byte]);
283
284        // Transaction ID
285        let txn_bytes = self.txn_id.to_le_bytes();
286        self.buffer.extend_from_slice(&txn_bytes);
287        hasher.update(&txn_bytes);
288
289        // Timestamp
290        let ts_bytes = timestamp_us.to_le_bytes();
291        self.buffer.extend_from_slice(&ts_bytes);
292        hasher.update(&ts_bytes);
293
294        // Key length
295        let key_len_bytes = (key.len() as u32).to_le_bytes();
296        self.buffer.extend_from_slice(&key_len_bytes);
297        hasher.update(&key_len_bytes);
298
299        // Value length
300        let val_len_bytes = (value.len() as u32).to_le_bytes();
301        self.buffer.extend_from_slice(&val_len_bytes);
302        hasher.update(&val_len_bytes);
303
304        // Key data
305        self.buffer.extend_from_slice(key);
306        hasher.update(key);
307
308        // Value data
309        self.buffer.extend_from_slice(value);
310        hasher.update(value);
311
312        // CRC32 checksum
313        self.buffer
314            .extend_from_slice(&hasher.finalize().to_le_bytes());
315
316        // Fill in length prefix (content length, not including the 4-byte length field)
317        let content_len = (total_len - 4) as u32;
318        self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
319
320        self.entry_count += 1;
321    }
322
323    /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
324    ///
325    /// Acquires the WAL lock once and writes all accumulated entries.
326    /// Returns the first sequence number assigned to buffered entries.
327    ///
328    /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
329    /// as it properly handles the private writer field.
330    #[inline]
331    pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
332        wal.flush_buffer(self)
333    }
334
335    /// Clear the buffer (for reuse)
336    #[inline]
337    pub fn clear(&mut self) {
338        self.buffer.clear();
339        self.entry_count = 0;
340    }
341
342    /// Get number of buffered entries
343    #[inline]
344    pub fn entry_count(&self) -> usize {
345        self.entry_count
346    }
347
348    /// Get total bytes buffered
349    #[inline]
350    pub fn bytes_buffered(&self) -> usize {
351        self.buffer.len()
352    }
353
354    /// Check if buffer is empty
355    #[inline]
356    pub fn is_empty(&self) -> bool {
357        self.buffer.is_empty()
358    }
359}
360
361/// WAL entry for transaction-aware operations
362#[derive(Debug, Clone)]
363pub struct TxnWalEntry {
364    /// Record type
365    pub record_type: WalRecordType,
366    /// Transaction ID
367    pub txn_id: u64,
368    /// Timestamp in microseconds
369    pub timestamp_us: u64,
370    /// Key data
371    pub key: Vec<u8>,
372    /// Value data
373    pub value: Vec<u8>,
374}
375
376impl TxnWalEntry {
377    /// Create a new data entry
378    pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
379        Self {
380            record_type: WalRecordType::Data,
381            txn_id,
382            timestamp_us: Self::now_us(),
383            key,
384            value,
385        }
386    }
387
388    /// Create a transaction begin entry
389    pub fn txn_begin(txn_id: u64) -> Self {
390        Self {
391            record_type: WalRecordType::TxnBegin,
392            txn_id,
393            timestamp_us: Self::now_us(),
394            key: Vec::new(),
395            value: Vec::new(),
396        }
397    }
398
399    /// Create a transaction commit entry
400    pub fn txn_commit(txn_id: u64) -> Self {
401        Self {
402            record_type: WalRecordType::TxnCommit,
403            txn_id,
404            timestamp_us: Self::now_us(),
405            key: Vec::new(),
406            value: Vec::new(),
407        }
408    }
409
410    /// Create a transaction abort entry
411    pub fn txn_abort(txn_id: u64) -> Self {
412        Self {
413            record_type: WalRecordType::TxnAbort,
414            txn_id,
415            timestamp_us: Self::now_us(),
416            key: Vec::new(),
417            value: Vec::new(),
418        }
419    }
420
421    /// Create a checkpoint entry
422    pub fn checkpoint(txn_id: u64) -> Self {
423        Self {
424            record_type: WalRecordType::Checkpoint,
425            txn_id,
426            timestamp_us: Self::now_us(),
427            key: Vec::new(),
428            value: Vec::new(),
429        }
430    }
431
432    /// Create a schema change entry
433    pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
434        Self {
435            record_type: WalRecordType::SchemaChange,
436            txn_id,
437            timestamp_us: Self::now_us(),
438            key: Vec::new(),
439            value: schema_data,
440        }
441    }
442
443    /// Get current time in microseconds (uses cached timestamp)
444    #[inline]
445    fn now_us() -> u64 {
446        cached_timestamp_us()
447    }
448
449    /// Calculate CRC32 checksum for this entry
450    ///
451    /// Uses crc32fast for portable, deterministic checksums.
452    /// The checksum covers all fields except the checksum itself.
453    /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
454    pub fn checksum(&self) -> u32 {
455        let mut hasher = crc32fast::Hasher::new();
456        hasher.update(&[self.record_type as u8]);
457        hasher.update(&self.txn_id.to_le_bytes());
458        hasher.update(&self.timestamp_us.to_le_bytes());
459        hasher.update(&(self.key.len() as u32).to_le_bytes());
460        hasher.update(&(self.value.len() as u32).to_le_bytes());
461        hasher.update(&self.key);
462        hasher.update(&self.value);
463        hasher.finalize()
464    }
465
466    /// Serialize to bytes with single-pass CRC calculation
467    ///
468    /// Optimized to calculate CRC while building the buffer,
469    /// avoiding a second pass over all data.
470    pub fn to_bytes(&self) -> Vec<u8> {
471        let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
472        let mut buf = Vec::with_capacity(total_len);
473        let mut hasher = crc32fast::Hasher::new();
474
475        // Length (not including the length field itself) - not included in CRC
476        let content_len = (total_len - 4) as u32;
477        buf.extend_from_slice(&content_len.to_le_bytes());
478
479        // Record type
480        let record_type_byte = self.record_type as u8;
481        buf.push(record_type_byte);
482        hasher.update(&[record_type_byte]);
483
484        // Transaction ID
485        let txn_bytes = self.txn_id.to_le_bytes();
486        buf.extend_from_slice(&txn_bytes);
487        hasher.update(&txn_bytes);
488
489        // Timestamp
490        let ts_bytes = self.timestamp_us.to_le_bytes();
491        buf.extend_from_slice(&ts_bytes);
492        hasher.update(&ts_bytes);
493
494        // Key length
495        let key_len_bytes = (self.key.len() as u32).to_le_bytes();
496        buf.extend_from_slice(&key_len_bytes);
497        hasher.update(&key_len_bytes);
498
499        // Value length
500        let val_len_bytes = (self.value.len() as u32).to_le_bytes();
501        buf.extend_from_slice(&val_len_bytes);
502        hasher.update(&val_len_bytes);
503
504        // Key data
505        buf.extend_from_slice(&self.key);
506        hasher.update(&self.key);
507
508        // Value data
509        buf.extend_from_slice(&self.value);
510        hasher.update(&self.value);
511
512        // CRC32 Checksum (computed in single pass above)
513        buf.extend_from_slice(&hasher.finalize().to_le_bytes());
514
515        buf
516    }
517
518    /// Deserialize a PLAINTEXT WAL frame from a reader, with torn-write detection.
519    ///
520    /// This is the legacy / unencrypted reader. The live replay path on an
521    /// encrypted WAL does NOT use this — it uses [`TxnWal::read_record`], which
522    /// decrypts first and then calls [`Self::parse_body`]. Keeping this method
523    /// plaintext-only ensures a stray caller can never silently mis-parse
524    /// ciphertext as a plaintext frame.
525    ///
526    /// Returns error if:
527    /// - Length field indicates data is too short or implausibly large
528    /// - CRC32 checksum mismatch (corruption or torn write)
529    /// - Invalid record type
530    pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
531        // Length (allows torn write detection - if length claims more data than exists)
532        let content_len = reader.read_u32::<LittleEndian>()?;
533        if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
534            return Err(SochDBError::Corruption("WAL entry too short".into()));
535        }
536        if content_len > MAX_WAL_FRAME_LEN {
537            return Err(SochDBError::Corruption(format!(
538                "WAL entry length {content_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
539            )));
540        }
541
542        // Read the whole body, then parse. A short read here (torn tail) surfaces
543        // as Io(UnexpectedEof), which replay treats as a clean end-of-WAL.
544        let mut body = vec![0u8; content_len as usize];
545        reader.read_exact(&mut body)?;
546        Self::parse_body(&body)
547    }
548
549    /// Parse a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) and verify its
550    /// CRC. This is the bytes after the `content_len` prefix in a plaintext frame,
551    /// and is exactly what the AEAD envelope protects in an encrypted frame — so
552    /// both the plaintext and decrypt paths share this single parser.
553    pub fn parse_body(body: &[u8]) -> Result<Self> {
554        let mut cur = std::io::Cursor::new(body);
555
556        let record_type_byte = cur.read_u8()?;
557        let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
558            SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
559        })?;
560
561        let txn_id = cur.read_u64::<LittleEndian>()?;
562        let timestamp_us = cur.read_u64::<LittleEndian>()?;
563        let key_len = cur.read_u32::<LittleEndian>()? as usize;
564        let value_len = cur.read_u32::<LittleEndian>()? as usize;
565
566        let mut key = vec![0u8; key_len];
567        cur.read_exact(&mut key)?;
568        let mut value = vec![0u8; value_len];
569        cur.read_exact(&mut value)?;
570
571        let stored_checksum = cur.read_u32::<LittleEndian>()?;
572
573        let entry = Self {
574            record_type,
575            txn_id,
576            timestamp_us,
577            key,
578            value,
579        };
580
581        // Verify checksum - detects both corruption and torn writes
582        if entry.checksum() != stored_checksum {
583            return Err(SochDBError::Corruption(format!(
584                "WAL checksum mismatch for txn_id {}: expected {}, got {}",
585                txn_id,
586                entry.checksum(),
587                stored_checksum
588            )));
589        }
590
591        Ok(entry)
592    }
593
594    /// The record BODY bytes (`type..crc32`) — i.e. `to_bytes()` without the
595    /// leading 4-byte content_len. This is what gets fed to the AEAD on the
596    /// encrypted write path.
597    fn body_bytes(&self) -> Vec<u8> {
598        let full = self.to_bytes();
599        full[4..].to_vec()
600    }
601}
602
603/// Transaction-aware Write-Ahead Log
604pub struct TxnWal {
605    /// Path to WAL file
606    path: PathBuf,
607    /// Buffered writer
608    writer: Mutex<BufWriter<File>>,
609    /// Next transaction ID
610    next_txn_id: AtomicU64,
611    /// Write sequence number
612    sequence: AtomicU64,
613    /// Bytes written since last sync
614    bytes_since_sync: AtomicU64,
615    /// Cached timestamp (microseconds since epoch)
616    /// Updated periodically to avoid syscall per write
617    cached_timestamp_us: AtomicU64,
618    /// At-rest encryption engine. `disabled()` (identity passthrough) for a
619    /// plaintext WAL — in which case every write/read path is byte-identical to
620    /// the pre-3B format. When enabled, records are written/read as encrypted
621    /// frames (see the framing notes above).
622    encryption: Arc<EncryptionEngine>,
623    /// 16-byte database identity bound into each record's AAD (cross-DB splice
624    /// defense). All-zero / unused when encryption is disabled.
625    db_uuid: [u8; 16],
626    /// Active DEK epoch bound into each record's AAD (downgrade-across-rotation
627    /// defense). 0 / unused when encryption is disabled.
628    dek_epoch: u32,
629    /// File-relative count of records written to the CURRENT WAL file (reset on
630    /// truncate). Used as the per-record AAD ordinal so reorder/duplication
631    /// within a file fails authentication. All writes hold the `writer` lock, so
632    /// this counter is only mutated under that lock; it is read locklessly by the
633    /// (single-threaded) replay paths.
634    records_in_file: AtomicU64,
635}
636
637impl TxnWal {
638    /// Create a new plaintext WAL or open an existing one.
639    ///
640    /// Uses a disabled (passthrough) encryption engine, so the on-disk format is
641    /// byte-identical to the pre-encryption WAL. This is the constructor used by
642    /// every non-live caller (tests, tools); the live durable-storage path uses
643    /// [`Self::new_with_encryption`] to thread the keyring's engine in.
644    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
645        Self::new_with_encryption(path, Arc::new(EncryptionEngine::disabled()), [0u8; 16], 0)
646    }
647
648    /// Create/open a WAL with an explicit at-rest encryption engine.
649    ///
650    /// When `engine` is enabled, records are written and replayed as encrypted
651    /// frames bound to `db_uuid` + `dek_epoch` (see framing notes). When it is
652    /// disabled, this behaves exactly like [`Self::new`].
653    pub fn new_with_encryption<P: AsRef<Path>>(
654        path: P,
655        encryption: Arc<EncryptionEngine>,
656        db_uuid: [u8; 16],
657        dek_epoch: u32,
658    ) -> Result<Self> {
659        let path = path.as_ref().to_path_buf();
660
661        // Ensure parent directory exists
662        if let Some(parent) = path.parent() {
663            std::fs::create_dir_all(parent)?;
664        }
665
666        let file = OpenOptions::new()
667            .create(true)
668            .append(true)
669            .read(true)
670            .open(&path)?;
671
672        // Use 256KB buffer for better batch performance (default is 8KB)
673        // This reduces system calls when buffering many small writes
674        let now_us = cached_timestamp_us();
675
676        let wal = Self {
677            path,
678            writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
679            next_txn_id: AtomicU64::new(1),
680            sequence: AtomicU64::new(0),
681            bytes_since_sync: AtomicU64::new(0),
682            cached_timestamp_us: AtomicU64::new(now_us),
683            encryption,
684            db_uuid,
685            dek_epoch,
686            records_in_file: AtomicU64::new(0),
687        };
688
689        // Recover state from existing WAL
690        wal.recover_state()?;
691
692        Ok(wal)
693    }
694
695    /// Build the per-record AAD for a given file-relative ordinal. The reader
696    /// reconstructs this from its own trusted state, never from on-disk bytes.
697    #[inline]
698    fn record_aad(&self, ordinal: u64) -> [u8; WAL_AAD_LEN] {
699        let mut aad = [0u8; WAL_AAD_LEN];
700        aad[0] = WAL_AAD_VERSION;
701        aad[1..17].copy_from_slice(&self.db_uuid);
702        aad[17..21].copy_from_slice(&self.dek_epoch.to_le_bytes());
703        aad[21..29].copy_from_slice(&ordinal.to_le_bytes());
704        aad
705    }
706
707    /// Frame a single record body for the encrypted write path:
708    /// `[outer_len:u32][version|nonce|ciphertext+tag]`, AAD-bound to `ordinal`.
709    #[inline]
710    fn encrypt_frame(&self, body: &[u8], ordinal: u64) -> Result<Vec<u8>> {
711        let env = self
712            .encryption
713            .encrypt_with_aad(body, &self.record_aad(ordinal))?;
714        let mut out = Vec::with_capacity(4 + env.len());
715        out.extend_from_slice(&(env.len() as u32).to_le_bytes());
716        out.extend_from_slice(&env);
717        Ok(out)
718    }
719
720    /// Read the next record from a replay reader, decrypting if the WAL is
721    /// encrypted. Returns the record and advances `*ordinal`.
722    ///
723    /// Error taxonomy (the linchpin of fail-loud recovery):
724    /// - `Io(UnexpectedEof)` reading the length prefix or a short body read at
725    ///   the physical tail ⇒ clean torn-tail / end-of-WAL (callers tolerate).
726    /// - `Encryption(_)` (AEAD auth failure / wrong key / bad envelope) or
727    ///   `Corruption(_)` (CRC / bad framing) ⇒ HARD error; callers MUST abort
728    ///   recovery rather than treat it as EOF.
729    fn read_record<R: Read>(&self, reader: &mut R, ordinal: &mut u64) -> Result<TxnWalEntry> {
730        if !self.encryption.is_enabled() {
731            let entry = TxnWalEntry::from_reader(reader)?;
732            *ordinal += 1;
733            return Ok(entry);
734        }
735        // Encrypted frame: [outer_len][envelope]
736        let outer_len = reader.read_u32::<LittleEndian>()?; // EOF here = clean end
737        if outer_len > MAX_WAL_FRAME_LEN {
738            return Err(SochDBError::Corruption(format!(
739                "encrypted WAL frame length {outer_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
740            )));
741        }
742        let mut env = vec![0u8; outer_len as usize];
743        reader.read_exact(&mut env)?; // short read = torn tail (UnexpectedEof)
744        // Decrypt with the AAD reconstructed for THIS ordinal. A wrong key, a
745        // reordered/spliced record, or tampering fails here as Encryption(_),
746        // which is a hard error — never a silent EOF.
747        let body = self
748            .encryption
749            .decrypt_with_aad(&env, &self.record_aad(*ordinal))?;
750        let entry = TxnWalEntry::parse_body(&body)?;
751        *ordinal += 1;
752        Ok(entry)
753    }
754
755    /// Recover state (next txn ID, sequence) from existing WAL
756    ///
757    /// To avoid txn_id collisions when multiple processes open the same
758    /// WAL concurrently, we incorporate the PID into the starting txn_id.
759    /// Format: upper 32 bits = PID, lower 32 bits = counter.
760    /// This guarantees uniqueness across processes without coordination.
761    fn recover_state(&self) -> Result<()> {
762        let file = File::open(&self.path)?;
763        let mut reader = BufReader::new(file);
764        let mut count: u64 = 0;
765
766        // Track the max counter (lower 32 bits) for OUR PID only.
767        // Each process owns its own txn_id space: upper 32 bits = PID.
768        // We must NOT use max_txn_id across ALL PIDs, because that would
769        // place us into another PID's ID space and cause collisions.
770        let our_pid = std::process::id() as u64;
771        let pid_base = our_pid << 32;
772        let mut max_our_counter: u64 = 0;
773        let mut ordinal: u64 = 0;
774
775        loop {
776            match self.read_record(&mut reader, &mut ordinal) {
777                Ok(entry) => {
778                    count += 1;
779                    // Only track counters from entries that belong to our PID
780                    let entry_pid = entry.txn_id >> 32;
781                    if entry_pid == our_pid {
782                        let entry_counter = entry.txn_id & 0xFFFF_FFFF;
783                        if entry_counter > max_our_counter {
784                            max_our_counter = entry_counter;
785                        }
786                    }
787                }
788                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
789                    break;
790                }
791                Err(e) => {
792                    // Encrypted: a non-EOF error (AEAD auth failure / tamper /
793                    // corruption) is NEVER a torn tail — fail loud rather than
794                    // silently truncate. Plaintext: tolerate trailing corruption
795                    // as an incomplete final write (legacy behavior).
796                    if self.encryption.is_enabled() {
797                        return Err(e);
798                    }
799                    break;
800                }
801            }
802        }
803
804        // Start from pid_base + (max counter we've seen for our PID + 1).
805        // This ensures:
806        //   1. Unique across processes (different PID → different upper 32 bits)
807        //   2. Unique within this process even if PID is recycled (we skip
808        //      over any counters already used by a previous process with our PID)
809        let next_id = pid_base + max_our_counter + 1;
810
811        self.next_txn_id.store(next_id, Ordering::SeqCst);
812        self.sequence.store(count, Ordering::SeqCst);
813        // The current file already holds `count` records, so the next encrypted
814        // write must continue the AAD ordinal sequence from there.
815        self.records_in_file.store(count, Ordering::SeqCst);
816
817        Ok(())
818    }
819
820    /// Get cached timestamp, updating if stale (>1ms old)
821    ///
822    /// This avoids a syscall per write by caching the timestamp.
823    /// For WAL purposes, ~1ms granularity is sufficient.
824    #[inline]
825    fn get_cached_timestamp(&self) -> u64 {
826        // Fast path: use cached value (no syscall)
827        let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
828
829        // Refresh occasionally (every ~1000 writes or when sequence wraps)
830        // This is a very cheap check since sequence is incremented atomically anyway
831        let seq = self.sequence.load(Ordering::Relaxed);
832        if seq & 0x3FF == 0 {
833            // Refresh every 1024 writes
834            let now_us = cached_timestamp_us();
835            self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
836            return now_us;
837        }
838
839        cached
840    }
841
842    /// Append an entry to the WAL
843    ///
844    /// Returns the sequence number of this write.
845    ///
846    /// # Durability Contract
847    ///
848    /// This method writes data to the BufWriter but does **NOT** fsync.
849    /// The data is NOT durable until `flush()` + `sync()` are called.
850    ///
851    /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
852    /// which enforce fsync. This method is intentionally non-durable for
853    /// batching data writes within a transaction (pre-commit records).
854    ///
855    /// The group commit path (`EventDrivenGroupCommit`) is responsible for
856    /// calling `flush()` + `sync()` after batching multiple commit records.
857    pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
858        let mut writer = self.writer.lock();
859        let bytes = self.frame_under_lock(entry)?;
860
861        writer.write_all(&bytes)?;
862        // Don't flush here - BufWriter will batch writes automatically.
863        // Call flush() explicitly before sync() or commit().
864
865        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
866        self.bytes_since_sync
867            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
868
869        Ok(seq)
870    }
871
872    /// Serialize one entry into its on-disk frame, allocating the per-record
873    /// ordinal when encrypted. MUST be called with the `writer` lock held so the
874    /// ordinal matches the order bytes hit the file.
875    #[inline]
876    fn frame_under_lock(&self, entry: &TxnWalEntry) -> Result<Vec<u8>> {
877        if self.encryption.is_enabled() {
878            let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
879            self.encrypt_frame(&entry.body_bytes(), ord)
880        } else {
881            Ok(entry.to_bytes())
882        }
883    }
884
885    /// Append entry without flushing (for batched writes)
886    ///
887    /// Caller must call flush() or sync() afterward to ensure durability.
888    #[inline]
889    pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
890        let mut writer = self.writer.lock();
891        let bytes = self.frame_under_lock(entry)?;
892
893        writer.write_all(&bytes)?;
894        // No flush - let BufWriter buffer the writes
895
896        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
897        self.bytes_since_sync
898            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
899
900        Ok(seq)
901    }
902
903    /// Write data without flushing (for batched writes within a transaction)
904    #[inline]
905    pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
906        let entry = TxnWalEntry::data(txn_id, key, value);
907        self.append_no_flush(&entry)
908    }
909
910    /// Write data from slices without any allocation
911    ///
912    /// This is the fastest path for writing - no intermediate Vec allocations.
913    /// Calculates CRC32 while serializing directly to BufWriter.
914    #[inline]
915    pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
916        // Use coarse timestamp (cached every ~1ms) instead of syscall per write
917        let timestamp_us = self.get_cached_timestamp();
918
919        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
920
921        let mut writer = self.writer.lock();
922
923        // Encrypted mode cannot stream field-by-field (the AEAD needs the whole
924        // body to compute its synthetic IV + tag), so materialize the body into a
925        // scratch buffer, seal it, and write one framed envelope. The plaintext
926        // path below keeps its zero-alloc streaming fast path untouched.
927        if self.encryption.is_enabled() {
928            let body = encode_record_body(WalRecordType::Data, txn_id, timestamp_us, key, value);
929            let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
930            let frame = self.encrypt_frame(&body, ord)?;
931            writer.write_all(&frame)?;
932            let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
933            self.bytes_since_sync
934                .fetch_add(frame.len() as u64, Ordering::Relaxed);
935            return Ok(seq);
936        }
937
938        let mut hasher = crc32fast::Hasher::new();
939
940        // Length (not included in CRC)
941        let content_len = (total_len - 4) as u32;
942        writer.write_all(&content_len.to_le_bytes())?;
943
944        // Record type
945        let record_type_byte = WalRecordType::Data as u8;
946        writer.write_all(&[record_type_byte])?;
947        hasher.update(&[record_type_byte]);
948
949        // Transaction ID
950        let txn_bytes = txn_id.to_le_bytes();
951        writer.write_all(&txn_bytes)?;
952        hasher.update(&txn_bytes);
953
954        // Timestamp
955        let ts_bytes = timestamp_us.to_le_bytes();
956        writer.write_all(&ts_bytes)?;
957        hasher.update(&ts_bytes);
958
959        // Key length
960        let key_len_bytes = (key.len() as u32).to_le_bytes();
961        writer.write_all(&key_len_bytes)?;
962        hasher.update(&key_len_bytes);
963
964        // Value length
965        let val_len_bytes = (value.len() as u32).to_le_bytes();
966        writer.write_all(&val_len_bytes)?;
967        hasher.update(&val_len_bytes);
968
969        // Key data
970        writer.write_all(key)?;
971        hasher.update(key);
972
973        // Value data
974        writer.write_all(value)?;
975        hasher.update(value);
976
977        // CRC32 checksum
978        writer.write_all(&hasher.finalize().to_le_bytes())?;
979
980        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
981        self.bytes_since_sync
982            .fetch_add(total_len as u64, Ordering::Relaxed);
983
984        Ok(seq)
985    }
986
987    /// Flush pending writes to kernel buffer (but not to disk)
988    pub fn flush(&self) -> Result<()> {
989        let mut writer = self.writer.lock();
990        writer.flush()?;
991        Ok(())
992    }
993
994    /// Append and immediately sync for durability
995    pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
996        let seq = self.append(entry)?;
997        self.sync()?;
998        Ok(seq)
999    }
1000
1001    /// Force sync to disk.
1002    ///
1003    /// Must flush the BufWriter BEFORE fsync: `get_ref()` reaches the raw File
1004    /// and bypasses the 256 KB buffer, so without an explicit `flush()` the
1005    /// just-appended commit/checkpoint record may still sit in userspace and
1006    /// `sync_all()` would fsync stale bytes — silently breaking the durability
1007    /// guarantee `append_sync`/`commit`/`checkpoint` advertise.
1008    pub fn sync(&self) -> Result<()> {
1009        let mut writer = self.writer.lock();
1010        writer.flush()?;
1011        writer.get_ref().sync_all()?;
1012        self.bytes_since_sync.store(0, Ordering::Relaxed);
1013        Ok(())
1014    }
1015
1016    /// Flush a TxnWalBuffer with single lock acquisition
1017    ///
1018    /// This is the high-performance path for transaction commit:
1019    /// - All writes during the transaction are buffered locally
1020    /// - At commit, this method flushes everything with ONE lock
1021    ///
1022    /// ## Performance
1023    ///
1024    /// ```text
1025    /// 1000 writes with individual flush: 1000 × lock overhead
1026    /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
1027    /// Speedup: ~1000× for lock overhead
1028    /// ```
1029    #[inline]
1030    pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
1031        if buffer.is_empty() {
1032            return Ok(0);
1033        }
1034
1035        let mut writer = self.writer.lock();
1036
1037        if self.encryption.is_enabled() {
1038            // The buffer holds concatenated plaintext frames
1039            // `[content_len][body]...`. Re-frame each as an AEAD envelope bound to
1040            // its own file-relative ordinal, preserving per-record framing (so a
1041            // torn tail discards exactly one record).
1042            let buf = &buffer.buffer;
1043            let mut pos = 0usize;
1044            let mut total_written = 0u64;
1045            while pos + 4 <= buf.len() {
1046                let content_len =
1047                    u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
1048                        as usize;
1049                pos += 4;
1050                if pos + content_len > buf.len() {
1051                    return Err(SochDBError::Corruption(
1052                        "txn buffer truncated mid-record during encrypted flush".into(),
1053                    ));
1054                }
1055                let body = &buf[pos..pos + content_len];
1056                pos += content_len;
1057                let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
1058                let frame = self.encrypt_frame(body, ord)?;
1059                writer.write_all(&frame)?;
1060                total_written += frame.len() as u64;
1061            }
1062            let seq = self
1063                .sequence
1064                .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1065            self.bytes_since_sync
1066                .fetch_add(total_written, Ordering::Relaxed);
1067            return Ok(seq);
1068        }
1069
1070        writer.write_all(&buffer.buffer)?;
1071
1072        let seq = self
1073            .sequence
1074            .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1075        self.bytes_since_sync
1076            .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
1077
1078        Ok(seq)
1079    }
1080
1081    /// Get the current size of the WAL file in bytes
1082    pub fn size_bytes(&self) -> u64 {
1083        std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
1084    }
1085
1086    /// Allocate a new transaction ID
1087    pub fn alloc_txn_id(&self) -> u64 {
1088        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1089    }
1090
1091    /// Begin a new transaction
1092    pub fn begin_transaction(&self) -> Result<u64> {
1093        let txn_id = self.alloc_txn_id();
1094        let entry = TxnWalEntry::txn_begin(txn_id);
1095        self.append(&entry)?;
1096        Ok(txn_id)
1097    }
1098
1099    /// Commit a transaction (with fsync for durability)
1100    ///
1101    /// This flushes all pending writes and then fsyncs the commit record.
1102    ///
1103    /// # Durability Guarantee
1104    ///
1105    /// After this method returns `Ok(())`, the commit record is durable on disk.
1106    /// The transaction is guaranteed to survive process crash, OS crash, and
1107    /// power failure (assuming the storage device honors fsync correctly).
1108    ///
1109    /// # Performance
1110    ///
1111    /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
1112    /// For high-throughput workloads, use `EventDrivenGroupCommit` which
1113    /// batches multiple commits into a single fsync via `commit_durable_batch()`.
1114    pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
1115        // First flush any pending buffered writes
1116        self.flush()?;
1117
1118        // Then write commit record with fsync
1119        let entry = TxnWalEntry::txn_commit(txn_id);
1120        self.append_sync(&entry)?;
1121        Ok(())
1122    }
1123
1124    /// Commit a batch of transactions with a single fsync (group commit).
1125    ///
1126    /// Writes commit records for all transaction IDs, then performs a single
1127    /// flush + fsync. This amortizes the fsync cost across N transactions,
1128    /// achieving ~N× throughput improvement over individual commits.
1129    ///
1130    /// # Durability Guarantee
1131    ///
1132    /// After this method returns `Ok(())`, ALL transactions in the batch
1133    /// are durable on disk. Either all commit records are visible after
1134    /// crash recovery, or none are (atomic batch durability).
1135    ///
1136    /// # Usage
1137    ///
1138    /// This method is called by `EventDrivenGroupCommit::flush_fn` to
1139    /// implement the group commit pattern. Do not call directly unless
1140    /// you are implementing your own commit batching.
1141    pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
1142        // Write all commit records without flushing (batch them in BufWriter)
1143        for &txn_id in txn_ids {
1144            let entry = TxnWalEntry::txn_commit(txn_id);
1145            self.append_no_flush(&entry)?;
1146        }
1147
1148        // Single flush + fsync for the entire batch
1149        self.flush()?;
1150        self.sync()?;
1151        Ok(())
1152    }
1153
1154    /// Abort a transaction
1155    pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
1156        let entry = TxnWalEntry::txn_abort(txn_id);
1157        self.append(&entry)?;
1158        Ok(())
1159    }
1160
1161    /// Write data within a transaction
1162    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1163        let entry = TxnWalEntry::data(txn_id, key, value);
1164        self.append(&entry)
1165    }
1166
1167    /// Replay WAL for crash recovery
1168    ///
1169    /// Returns (committed_writes, recovered_txn_count)
1170    #[allow(clippy::type_complexity)]
1171    pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1172        let file = File::open(&self.path)?;
1173        let mut reader = BufReader::new(file);
1174
1175        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1176            std::collections::HashMap::new();
1177        let mut result = Vec::new();
1178        let mut txn_count = 0;
1179        let mut ordinal: u64 = 0;
1180
1181        // Single pass in WAL order. Transaction ids are process-local and
1182        // short-lived CLI processes can reuse the same ids after restart, so
1183        // grouping the entire WAL by txn_id would merge unrelated transactions
1184        // and replay older writes after newer ones.
1185        loop {
1186            match self.read_record(&mut reader, &mut ordinal) {
1187                Ok(entry) => match entry.record_type {
1188                    WalRecordType::TxnBegin => {
1189                        pending_writes.insert(entry.txn_id, Vec::new());
1190                    }
1191                    WalRecordType::Data => {
1192                        // Accept data for any txn_id we've seen a Begin for,
1193                        // and also for txn_ids without a Begin (they might have
1194                        // their Begin later in the WAL due to buffered writes).
1195                        pending_writes
1196                            .entry(entry.txn_id)
1197                            .or_insert_with(Vec::new)
1198                            .push((entry.key, entry.value));
1199                    }
1200                    WalRecordType::TxnCommit => {
1201                        if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1202                            result.extend(writes);
1203                            txn_count += 1;
1204                        }
1205                    }
1206                    WalRecordType::TxnAbort => {
1207                        pending_writes.remove(&entry.txn_id);
1208                    }
1209                    _ => {}
1210                },
1211                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1212                    break;
1213                }
1214                Err(e) => {
1215                    // Encrypted WALs must fail loud on a non-EOF error so a wrong
1216                    // key / tamper / corruption can never masquerade as EOF and
1217                    // silently drop committed data. (Wrong key is already excluded
1218                    // earlier by the keyring canary, so this is genuine corruption
1219                    // or tampering.) Plaintext keeps the legacy torn-tail tolerance.
1220                    if self.encryption.is_enabled() {
1221                        return Err(e);
1222                    }
1223                    break;
1224                }
1225            }
1226        }
1227
1228        Ok((result, txn_count))
1229    }
1230
1231    /// Replay WAL with a callback
1232    pub fn replay<F>(&self, mut callback: F) -> Result<u64>
1233    where
1234        F: FnMut(TxnWalEntry) -> Result<()>,
1235    {
1236        let file = File::open(&self.path)?;
1237        let mut reader = BufReader::new(file);
1238        let mut count = 0u64;
1239        let mut ordinal: u64 = 0;
1240
1241        loop {
1242            match self.read_record(&mut reader, &mut ordinal) {
1243                Ok(entry) => {
1244                    callback(entry)?;
1245                    count += 1;
1246                }
1247                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1248                    break;
1249                }
1250                Err(e) => {
1251                    // Encrypted: fail loud (never silently drop committed data on a
1252                    // non-EOF error). Plaintext: tolerate a trailing incomplete entry.
1253                    if self.encryption.is_enabled() {
1254                        return Err(e);
1255                    }
1256                    eprintln!("WAL replay warning: {:?}", e);
1257                    break;
1258                }
1259            }
1260        }
1261
1262        Ok(count)
1263    }
1264
1265    /// Truncate WAL (called after successful checkpoint)
1266    ///
1267    /// Flushes any buffered writes, truncates the file to 0 bytes,
1268    /// and resets sequence counters. The file is opened in `O_APPEND`
1269    /// mode so subsequent writes will correctly start at offset 0.
1270    ///
1271    /// **WARNING**: After truncation, all data durability is lost.
1272    /// The in-memory memtable still holds data for the current session,
1273    /// but a crash after truncation means the data cannot be recovered
1274    /// from the WAL.
1275    pub fn truncate(&self) -> Result<()> {
1276        let mut writer = self.writer.lock();
1277        // Flush BufWriter so no stale data is written after truncation
1278        writer.flush()?;
1279        let file = writer.get_ref();
1280        file.set_len(0)?;
1281        file.sync_all()?;
1282        self.sequence.store(0, Ordering::SeqCst);
1283        self.bytes_since_sync.store(0, Ordering::Relaxed);
1284        // The file restarts at offset 0, so per-record AAD ordinals restart too;
1285        // a fresh reader of the truncated file will count from 0 to match.
1286        self.records_in_file.store(0, Ordering::SeqCst);
1287        Ok(())
1288    }
1289
1290    /// Write a checkpoint marker
1291    pub fn write_checkpoint(&self) -> Result<u64> {
1292        let entry = TxnWalEntry::checkpoint(0);
1293        self.append_sync(&entry)
1294    }
1295
1296    /// Write a Compensation Log Record (CLR) for undo operations
1297    ///
1298    /// CLRs are redo-only records that skip past already undone operations
1299    /// during recovery. The undo_next_lsn field tells recovery where to
1300    /// continue undoing after this CLR.
1301    pub fn append_clr(
1302        &self,
1303        txn_id: u64,
1304        _original_lsn: u64,
1305        undo_next_lsn: Option<u64>,
1306        undo_data: &[u8],
1307    ) -> Result<u64> {
1308        // Encode undo_next_lsn into the key field
1309        let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1310        let entry = TxnWalEntry {
1311            record_type: WalRecordType::CompensationLogRecord,
1312            txn_id,
1313            timestamp_us: TxnWalEntry::now_us(),
1314            key, // undo_next_lsn encoded in key
1315            value: undo_data.to_vec(),
1316        };
1317        self.append(&entry)
1318    }
1319
1320    /// Write checkpoint with data (for fuzzy checkpoints)
1321    pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1322        let entry = TxnWalEntry {
1323            record_type: WalRecordType::Checkpoint,
1324            txn_id: 0,
1325            timestamp_us: TxnWalEntry::now_us(),
1326            key: Vec::new(),
1327            value: checkpoint_data.to_vec(),
1328        };
1329        self.append_sync(&entry)
1330    }
1331
1332    /// Write checkpoint end with captured state
1333    pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1334        let entry = TxnWalEntry {
1335            record_type: WalRecordType::CheckpointEnd,
1336            txn_id: 0,
1337            timestamp_us: TxnWalEntry::now_us(),
1338            key: Vec::new(),
1339            value: checkpoint_data.to_vec(),
1340        };
1341        self.append_sync(&entry)
1342    }
1343
1344    /// Get current sequence number
1345    pub fn sequence(&self) -> u64 {
1346        self.sequence.load(Ordering::SeqCst)
1347    }
1348
1349    /// Get bytes written since last sync
1350    pub fn bytes_since_sync(&self) -> u64 {
1351        self.bytes_since_sync.load(Ordering::Relaxed)
1352    }
1353
1354    /// Get path to WAL file
1355    pub fn path(&self) -> &Path {
1356        &self.path
1357    }
1358}
1359
1360/// Statistics about WAL state
1361#[derive(Debug, Clone, Default)]
1362pub struct TxnWalStats {
1363    /// Number of entries written
1364    pub entries_written: u64,
1365    /// Bytes written since last sync
1366    pub bytes_since_sync: u64,
1367    /// Current transaction ID counter
1368    pub next_txn_id: u64,
1369}
1370
1371// ============================================================================
1372// Sharded WAL for Reduced Mutex Contention
1373// ============================================================================
1374
1375/// Sharded Write-Ahead Log for high-concurrency workloads
1376///
1377/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1378/// - Writers hash to shard by txn_id
1379/// - Each shard has its own buffer
1380/// - Central coordinator handles fsync ordering
1381///
1382/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1383#[allow(dead_code)]
1384pub struct ShardedWal {
1385    /// Shard writers (txn_id % num_shards selects shard)
1386    shards: Vec<parking_lot::Mutex<WalShard>>,
1387    /// Number of shards (power of 2)
1388    num_shards: usize,
1389    /// Central WAL file for ordered writes
1390    central_writer: parking_lot::Mutex<BufWriter<File>>,
1391    /// Next transaction ID
1392    next_txn_id: AtomicU64,
1393    /// Write sequence (global ordering)
1394    sequence: AtomicU64,
1395    /// Path
1396    path: PathBuf,
1397}
1398
1399/// Individual WAL shard buffer
1400struct WalShard {
1401    /// Buffered entries for this shard
1402    buffer: Vec<u8>,
1403    /// Number of entries buffered
1404    entry_count: usize,
1405}
1406
1407impl WalShard {
1408    fn new() -> Self {
1409        Self {
1410            buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1411            entry_count: 0,
1412        }
1413    }
1414
1415    fn append(&mut self, entry: &TxnWalEntry) {
1416        let bytes = entry.to_bytes();
1417        self.buffer.extend_from_slice(&bytes);
1418        self.entry_count += 1;
1419    }
1420
1421    fn is_empty(&self) -> bool {
1422        self.buffer.is_empty()
1423    }
1424
1425    fn drain(&mut self) -> Vec<u8> {
1426        self.entry_count = 0;
1427        std::mem::take(&mut self.buffer)
1428    }
1429}
1430
1431impl ShardedWal {
1432    /// Create sharded WAL with specified number of shards
1433    ///
1434    /// Recommended: 4-8 shards for typical server workloads
1435    pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1436        let path = path.as_ref().to_path_buf();
1437
1438        if let Some(parent) = path.parent() {
1439            std::fs::create_dir_all(parent)?;
1440        }
1441
1442        let file = std::fs::OpenOptions::new()
1443            .create(true)
1444            .append(true)
1445            .read(true)
1446            .open(&path)?;
1447
1448        // Round up to power of 2 for fast modulo
1449        let num_shards = num_shards.next_power_of_two();
1450        let shards: Vec<_> = (0..num_shards)
1451            .map(|_| parking_lot::Mutex::new(WalShard::new()))
1452            .collect();
1453
1454        Ok(Self {
1455            shards,
1456            num_shards,
1457            central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1458            next_txn_id: AtomicU64::new(1),
1459            sequence: AtomicU64::new(0),
1460            path,
1461        })
1462    }
1463
1464    /// Get shard index for transaction
1465    #[inline]
1466    fn shard_idx(&self, txn_id: u64) -> usize {
1467        (txn_id as usize) & (self.num_shards - 1)
1468    }
1469
1470    /// Append entry to appropriate shard (lock-free for different txns)
1471    pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1472        let shard_idx = self.shard_idx(entry.txn_id);
1473        let mut shard = self.shards[shard_idx].lock();
1474        shard.append(entry);
1475        self.sequence.fetch_add(1, Ordering::SeqCst)
1476    }
1477
1478    /// Allocate transaction ID
1479    pub fn alloc_txn_id(&self) -> u64 {
1480        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1481    }
1482
1483    /// Flush all shard buffers to central file
1484    pub fn flush(&self) -> Result<()> {
1485        let mut central = self.central_writer.lock();
1486
1487        // Collect all shard buffers (brief lock per shard)
1488        for shard in &self.shards {
1489            let mut shard_guard = shard.lock();
1490            if !shard_guard.is_empty() {
1491                let data = shard_guard.drain();
1492                central.write_all(&data)?;
1493            }
1494        }
1495
1496        central.flush()?;
1497        Ok(())
1498    }
1499
1500    /// Sync to disk (fsync)
1501    pub fn sync(&self) -> Result<()> {
1502        self.flush()?;
1503        let central = self.central_writer.lock();
1504        central.get_ref().sync_all()?;
1505        Ok(())
1506    }
1507
1508    /// Begin transaction
1509    pub fn begin_transaction(&self) -> Result<u64> {
1510        let txn_id = self.alloc_txn_id();
1511        let entry = TxnWalEntry::txn_begin(txn_id);
1512        self.append(&entry);
1513        Ok(txn_id)
1514    }
1515
1516    /// Write data
1517    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1518        let entry = TxnWalEntry::data(txn_id, key, value);
1519        Ok(self.append(&entry))
1520    }
1521
1522    /// Commit transaction
1523    pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1524        let entry = TxnWalEntry::txn_commit(txn_id);
1525        let seq = self.append(&entry);
1526        self.sync()?; // Fsync on commit for durability
1527        Ok(seq)
1528    }
1529
1530    /// Get statistics
1531    pub fn stats(&self) -> ShardedWalStats {
1532        let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1533        for shard in &self.shards {
1534            shard_entry_counts.push(shard.lock().entry_count);
1535        }
1536
1537        ShardedWalStats {
1538            num_shards: self.num_shards,
1539            total_entries: self.sequence.load(Ordering::SeqCst),
1540            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1541            shard_entry_counts,
1542        }
1543    }
1544}
1545
1546/// Statistics for sharded WAL
1547#[derive(Debug, Clone)]
1548pub struct ShardedWalStats {
1549    pub num_shards: usize,
1550    pub total_entries: u64,
1551    pub next_txn_id: u64,
1552    pub shard_entry_counts: Vec<usize>,
1553}
1554
1555/// Detailed crash recovery statistics
1556#[derive(Debug, Clone, Default)]
1557pub struct CrashRecoveryStats {
1558    /// Total records read from WAL
1559    pub total_records: u64,
1560    /// Number of committed transactions
1561    pub committed_txns: u64,
1562    /// Number of uncommitted (rolled back) transactions
1563    pub rolled_back_txns: u64,
1564    /// Number of explicitly aborted transactions
1565    pub aborted_txns: u64,
1566    /// Number of data writes recovered
1567    pub recovered_writes: u64,
1568    /// Number of torn/corrupted records at end (expected on crash)
1569    pub torn_records: u64,
1570    /// Bytes read from WAL
1571    pub bytes_read: u64,
1572    /// Recovery duration in microseconds
1573    pub recovery_duration_us: u64,
1574    /// Highest transaction ID seen (for restarting counter)
1575    pub max_txn_id: u64,
1576}
1577
1578impl TxnWal {
1579    /// Get WAL statistics
1580    pub fn stats(&self) -> TxnWalStats {
1581        TxnWalStats {
1582            entries_written: self.sequence.load(Ordering::SeqCst),
1583            bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1584            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1585        }
1586    }
1587
1588    /// Full crash recovery with detailed statistics
1589    ///
1590    /// This method provides ACID recovery guarantees:
1591    /// 1. **Atomicity**: Uncommitted transactions are rolled back
1592    /// 2. **Durability**: All committed transactions are replayed
1593    /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1594    ///
1595    /// Returns (committed_writes, stats) where committed_writes contains
1596    /// all key-value pairs from committed transactions in order.
1597    #[allow(clippy::type_complexity)]
1598    pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1599        let start_time = std::time::Instant::now();
1600        let file = File::open(&self.path)?;
1601        let file_size = file.metadata()?.len();
1602        let mut reader = BufReader::new(file);
1603
1604        let mut stats = CrashRecoveryStats {
1605            bytes_read: file_size,
1606            ..Default::default()
1607        };
1608
1609        let mut committed_txns: HashSet<u64> = HashSet::new();
1610        let mut aborted_txns: HashSet<u64> = HashSet::new();
1611        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1612            std::collections::HashMap::new();
1613        let mut all_txns: HashSet<u64> = HashSet::new();
1614
1615        // Read all records, stopping at first corruption (torn write)
1616        let mut ordinal: u64 = 0;
1617        loop {
1618            match self.read_record(&mut reader, &mut ordinal) {
1619                Ok(entry) => {
1620                    stats.total_records += 1;
1621                    if entry.txn_id > stats.max_txn_id {
1622                        stats.max_txn_id = entry.txn_id;
1623                    }
1624
1625                    match entry.record_type {
1626                        WalRecordType::TxnBegin => {
1627                            pending_writes.insert(entry.txn_id, Vec::new());
1628                            all_txns.insert(entry.txn_id);
1629                        }
1630                        WalRecordType::Data => {
1631                            if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1632                                writes.push((entry.key, entry.value));
1633                            }
1634                        }
1635                        WalRecordType::TxnCommit => {
1636                            committed_txns.insert(entry.txn_id);
1637                        }
1638                        WalRecordType::TxnAbort => {
1639                            pending_writes.remove(&entry.txn_id);
1640                            aborted_txns.insert(entry.txn_id);
1641                        }
1642                        _ => {}
1643                    }
1644                }
1645                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1646                    // Clean EOF
1647                    break;
1648                }
1649                Err(e) => {
1650                    // Encrypted: an AEAD auth failure / tamper / corruption is a
1651                    // hard error — abort recovery rather than silently truncating
1652                    // committed data (wrong key is already caught by the keyring
1653                    // canary at open). Plaintext: treat trailing corruption as a
1654                    // torn write and stop.
1655                    if self.encryption.is_enabled() {
1656                        return Err(e);
1657                    }
1658                    stats.torn_records += 1;
1659                    break;
1660                }
1661            }
1662        }
1663
1664        // Collect committed writes
1665        let mut result = Vec::new();
1666        for (txn_id, writes) in &pending_writes {
1667            if committed_txns.contains(txn_id) {
1668                stats.committed_txns += 1;
1669                stats.recovered_writes += writes.len() as u64;
1670                result.extend(writes.clone());
1671            }
1672        }
1673
1674        // Count aborted and rolled-back transactions
1675        stats.aborted_txns = aborted_txns.len() as u64;
1676        stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1677
1678        stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1679
1680        Ok((result, stats))
1681    }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686    use super::*;
1687    use tempfile::tempdir;
1688
1689    #[test]
1690    fn test_wal_entry_roundtrip() {
1691        let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1692        let bytes = entry.to_bytes();
1693
1694        let mut cursor = std::io::Cursor::new(bytes);
1695        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1696
1697        assert_eq!(recovered.record_type, WalRecordType::Data);
1698        assert_eq!(recovered.txn_id, 42);
1699        assert_eq!(recovered.key, b"key");
1700        assert_eq!(recovered.value, b"value");
1701    }
1702
1703    #[test]
1704    fn test_wal_append_and_replay() {
1705        let dir = tempdir().unwrap();
1706        let wal_path = dir.path().join("test.wal");
1707
1708        // Write some entries
1709        {
1710            let wal = TxnWal::new(&wal_path).unwrap();
1711            let txn_id = wal.begin_transaction().unwrap();
1712            wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1713            wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1714            wal.commit_transaction(txn_id).unwrap();
1715        }
1716
1717        // Replay and verify
1718        {
1719            let wal = TxnWal::new(&wal_path).unwrap();
1720            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1721
1722            assert_eq!(txn_count, 1);
1723            assert_eq!(writes.len(), 2);
1724            assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1725            assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1726        }
1727    }
1728
1729    #[test]
1730    fn test_uncommitted_transaction_rollback() {
1731        let dir = tempdir().unwrap();
1732        let wal_path = dir.path().join("test.wal");
1733
1734        // Write committed and uncommitted transactions
1735        {
1736            let wal = TxnWal::new(&wal_path).unwrap();
1737
1738            // Committed transaction
1739            let txn1 = wal.begin_transaction().unwrap();
1740            wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1741                .unwrap();
1742            wal.commit_transaction(txn1).unwrap();
1743
1744            // Uncommitted transaction (simulates crash)
1745            let txn2 = wal.begin_transaction().unwrap();
1746            wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1747                .unwrap();
1748            // No commit!
1749        }
1750
1751        // Replay - uncommitted should be rolled back
1752        {
1753            let wal = TxnWal::new(&wal_path).unwrap();
1754            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1755
1756            assert_eq!(txn_count, 1); // Only committed transaction
1757            assert_eq!(writes.len(), 1);
1758            assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1759        }
1760    }
1761
1762    #[test]
1763    fn test_aborted_transaction() {
1764        let dir = tempdir().unwrap();
1765        let wal_path = dir.path().join("test.wal");
1766
1767        {
1768            let wal = TxnWal::new(&wal_path).unwrap();
1769
1770            let txn = wal.begin_transaction().unwrap();
1771            wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1772                .unwrap();
1773            wal.abort_transaction(txn).unwrap();
1774        }
1775
1776        {
1777            let wal = TxnWal::new(&wal_path).unwrap();
1778            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1779
1780            assert_eq!(txn_count, 0);
1781            assert!(writes.is_empty());
1782        }
1783    }
1784
1785    #[test]
1786    fn test_checksum_validation() {
1787        let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1788        let mut bytes = entry.to_bytes();
1789
1790        // Corrupt the checksum
1791        let len = bytes.len();
1792        bytes[len - 1] ^= 0xFF;
1793
1794        let mut cursor = std::io::Cursor::new(bytes);
1795        let result = TxnWalEntry::from_reader(&mut cursor);
1796
1797        assert!(result.is_err());
1798    }
1799
1800    #[test]
1801    fn test_crash_recovery_with_stats() {
1802        let dir = tempdir().unwrap();
1803        let wal_path = dir.path().join("test.wal");
1804
1805        // Simulate complex workload
1806        {
1807            let wal = TxnWal::new(&wal_path).unwrap();
1808
1809            // Committed transaction 1
1810            let txn1 = wal.begin_transaction().unwrap();
1811            wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1812            wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1813            wal.commit_transaction(txn1).unwrap();
1814
1815            // Aborted transaction
1816            let txn2 = wal.begin_transaction().unwrap();
1817            wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1818                .unwrap();
1819            wal.abort_transaction(txn2).unwrap();
1820
1821            // Committed transaction 2
1822            let txn3 = wal.begin_transaction().unwrap();
1823            wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1824            wal.commit_transaction(txn3).unwrap();
1825
1826            // Uncommitted transaction (simulates crash)
1827            let txn4 = wal.begin_transaction().unwrap();
1828            wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1829                .unwrap();
1830            // No commit - simulates crash
1831        }
1832
1833        // Recover and verify
1834        {
1835            let wal = TxnWal::new(&wal_path).unwrap();
1836            let (writes, stats) = wal.crash_recovery().unwrap();
1837
1838            // Should have 3 writes from 2 committed transactions
1839            assert_eq!(writes.len(), 3);
1840            assert_eq!(stats.committed_txns, 2);
1841            assert_eq!(stats.aborted_txns, 1);
1842            assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1843            assert_eq!(stats.recovered_writes, 3);
1844            assert!(stats.recovery_duration_us > 0);
1845        }
1846    }
1847
1848    #[test]
1849    fn test_torn_write_detection() {
1850        let dir = tempdir().unwrap();
1851        let wal_path = dir.path().join("test.wal");
1852
1853        // Write a valid transaction
1854        {
1855            let wal = TxnWal::new(&wal_path).unwrap();
1856            let txn = wal.begin_transaction().unwrap();
1857            wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1858            wal.commit_transaction(txn).unwrap();
1859        }
1860
1861        // Append corrupted bytes to simulate torn write
1862        {
1863            use std::io::Write;
1864            let mut file = std::fs::OpenOptions::new()
1865                .append(true)
1866                .open(&wal_path)
1867                .unwrap();
1868            // Write partial record (torn write)
1869            file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1870                .unwrap();
1871        }
1872
1873        // Recovery should still work, detecting torn write
1874        {
1875            let wal = TxnWal::new(&wal_path).unwrap();
1876            let (writes, stats) = wal.crash_recovery().unwrap();
1877
1878            // Should recover the valid transaction
1879            assert_eq!(writes.len(), 1);
1880            assert_eq!(stats.committed_txns, 1);
1881            assert_eq!(stats.torn_records, 1);
1882        }
1883    }
1884
1885    #[test]
1886    fn test_crc32_determinism() {
1887        // Verify CRC32 produces consistent checksums for same content
1888        let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1889        entry1.timestamp_us = 12345; // Fixed timestamp for determinism
1890
1891        let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1892        entry2.timestamp_us = 12345; // Same timestamp
1893
1894        assert_eq!(entry1.checksum(), entry2.checksum());
1895
1896        // Different content should produce different checksum
1897        let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1898        entry3.timestamp_us = 12345;
1899        assert_ne!(entry1.checksum(), entry3.checksum());
1900
1901        // Verify roundtrip preserves checksum
1902        let bytes = entry1.to_bytes();
1903        let mut cursor = std::io::Cursor::new(bytes);
1904        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1905        assert_eq!(recovered.checksum(), entry1.checksum());
1906    }
1907}
1908
1909#[cfg(test)]
1910mod encryption_wal_tests {
1911    use super::*;
1912    use crate::encryption::EncryptionEngine;
1913    use tempfile::tempdir;
1914
1915    fn enc(key: u8) -> Arc<EncryptionEngine> {
1916        Arc::new(EncryptionEngine::new(&[key; 32]))
1917    }
1918    const UUID: [u8; 16] = [9u8; 16];
1919
1920    /// Write a committed txn (begin/data/commit) and one aborted txn via the
1921    /// encrypted paths, then reopen with the same key and crash-recover.
1922    #[test]
1923    fn encrypted_write_then_recover_roundtrip() {
1924        let dir = tempdir().unwrap();
1925        let path = dir.path().join("enc.wal");
1926
1927        {
1928            let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
1929            // committed txn via append + write_no_flush_refs + flush_buffer paths
1930            let t1 = wal.begin_transaction().unwrap();
1931            wal.write(t1, b"alpha".to_vec(), b"one".to_vec()).unwrap();
1932            wal.write_no_flush_refs(t1, b"beta", b"two").unwrap();
1933            // also exercise the buffer/flush_buffer batch path
1934            let mut buf = TxnWalBuffer::new(t1);
1935            buf.append(b"gamma", b"three");
1936            wal.flush_buffer(&buf).unwrap();
1937            wal.commit_transaction(t1).unwrap();
1938
1939            // aborted txn must NOT survive
1940            let t2 = wal.begin_transaction().unwrap();
1941            wal.write(t2, b"ghost".to_vec(), b"x".to_vec()).unwrap();
1942            wal.abort_transaction(t2).unwrap();
1943            wal.sync().unwrap();
1944        }
1945
1946        // On-disk bytes must NOT contain the plaintext values.
1947        let raw = std::fs::read(&path).unwrap();
1948        assert!(!contains(&raw, b"alpha"));
1949        assert!(!contains(&raw, b"three"));
1950
1951        let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
1952        let (writes, stats) = wal.crash_recovery().unwrap();
1953        let keys: Vec<_> = writes.iter().map(|(k, _)| k.clone()).collect();
1954        assert!(keys.contains(&b"alpha".to_vec()));
1955        assert!(keys.contains(&b"beta".to_vec()));
1956        assert!(keys.contains(&b"gamma".to_vec()));
1957        assert!(!keys.contains(&b"ghost".to_vec()), "aborted txn leaked");
1958        assert_eq!(stats.committed_txns, 1);
1959    }
1960
1961    /// Wrong key at the WAL layer must FAIL LOUD, not silently return empty.
1962    /// (In production the keyring canary catches this even earlier; this proves
1963    /// the replay path itself never swallows an AEAD failure as EOF.)
1964    #[test]
1965    fn wrong_key_fails_loud_not_empty() {
1966        let dir = tempdir().unwrap();
1967        let path = dir.path().join("enc.wal");
1968        {
1969            let wal = TxnWal::new_with_encryption(&path, enc(1), UUID, 0).unwrap();
1970            let t = wal.begin_transaction().unwrap();
1971            wal.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
1972            wal.commit_transaction(t).unwrap();
1973            wal.sync().unwrap();
1974        }
1975        // Reopen with the WRONG key: recover_state runs in the constructor and
1976        // must surface a hard error rather than an "empty" success.
1977        let opened = TxnWal::new_with_encryption(&path, enc(2), UUID, 0);
1978        assert!(opened.is_err(), "wrong key opened silently (data-loss bug)");
1979        match opened.err().unwrap() {
1980            SochDBError::Encryption(_) => {}
1981            other => panic!("expected Encryption error, got {other:?}"),
1982        }
1983    }
1984
1985    /// Tampering with a committed encrypted record must fail recovery loud.
1986    #[test]
1987    fn tamper_midstream_fails_loud() {
1988        let dir = tempdir().unwrap();
1989        let path = dir.path().join("enc.wal");
1990        {
1991            let wal = TxnWal::new_with_encryption(&path, enc(5), UUID, 0).unwrap();
1992            let t = wal.begin_transaction().unwrap();
1993            wal.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1994            wal.commit_transaction(t).unwrap();
1995            wal.sync().unwrap();
1996        }
1997        // Flip a byte well inside the file (not the final torn-tail region).
1998        let mut raw = std::fs::read(&path).unwrap();
1999        let mid = raw.len() / 2;
2000        raw[mid] ^= 0xFF;
2001        std::fs::write(&path, &raw).unwrap();
2002
2003        let opened = TxnWal::new_with_encryption(&path, enc(5), UUID, 0);
2004        // Either the constructor's recover_state errors, or a later crash_recovery
2005        // does — but it must NEVER silently accept tampered ciphertext.
2006        let failed = opened.is_err()
2007            || opened
2008                .ok()
2009                .map(|w| w.crash_recovery().is_err())
2010                .unwrap_or(true);
2011        assert!(failed, "tampered encrypted WAL was silently accepted");
2012    }
2013
2014    /// The disabled engine must write a record's bytes EXACTLY as the legacy
2015    /// `to_bytes()` frame (no added encryption framing / format drift), while the
2016    /// enabled engine must NOT (it adds the AEAD envelope and is unreadable by the
2017    /// plaintext reader). Records embed wall-clock timestamps, so we compare a
2018    /// single fixed entry object against its own `to_bytes()` for determinism.
2019    #[test]
2020    fn disabled_engine_is_byte_identical_to_plaintext() {
2021        let dir = tempdir().unwrap();
2022
2023        let entry = TxnWalEntry::data(42, b"k1".to_vec(), b"v1".to_vec());
2024        let golden = entry.to_bytes();
2025
2026        // Disabled engine: file bytes == to_bytes() exactly.
2027        let p_dis = dir.path().join("disabled.wal");
2028        {
2029            let wal = TxnWal::new_with_encryption(
2030                &p_dis,
2031                Arc::new(EncryptionEngine::disabled()),
2032                [0u8; 16],
2033                0,
2034            )
2035            .unwrap();
2036            wal.append(&entry).unwrap();
2037            wal.sync().unwrap();
2038        }
2039        assert_eq!(
2040            std::fs::read(&p_dis).unwrap(),
2041            golden,
2042            "disabled-engine append diverged from legacy plaintext frame"
2043        );
2044
2045        // Enabled engine: file bytes differ and are NOT plaintext-parseable.
2046        let p_enc = dir.path().join("enc.wal");
2047        {
2048            let wal = TxnWal::new_with_encryption(&p_enc, enc(3), UUID, 0).unwrap();
2049            wal.append(&entry).unwrap();
2050            wal.sync().unwrap();
2051        }
2052        let enc_bytes = std::fs::read(&p_enc).unwrap();
2053        assert_ne!(enc_bytes, golden);
2054        let mut cur = std::io::Cursor::new(&enc_bytes);
2055        assert!(
2056            TxnWalEntry::from_reader(&mut cur).is_err()
2057                || cur.position() as usize != enc_bytes.len(),
2058            "ciphertext frame must not parse cleanly as a plaintext record"
2059        );
2060    }
2061
2062    fn contains(haystack: &[u8], needle: &[u8]) -> bool {
2063        haystack.windows(needle.len()).any(|w| w == needle)
2064    }
2065}