Skip to main content

sochdb_storage/
txn_wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1)  │ TxnId(8)│ Timestamp │ KeyLen  │ ValueLen  │ Key(*)  │ Value(*)│ CRC32(4) │
40//! │           │          │         │   (8)     │  (4)    │   (4)     │         │         │          │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use crate::encryption::EncryptionEngine;
52use byteorder::{LittleEndian, ReadBytesExt};
53use parking_lot::Mutex;
54use sochdb_core::{Result, SochDBError, WalRecordType};
55use std::cell::Cell;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::io::{BufReader, BufWriter, Read, Write};
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::time::{Instant, SystemTime, UNIX_EPOCH};
63
64// =============================================================================
65// Coarse-Grained Timestamp Caching (Recommendation 5)
66// =============================================================================
67
68/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
69const CACHE_VALIDITY_NS: u64 = 1_000_000;
70
71thread_local! {
72    /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
73    /// Avoids syscall overhead by caching timestamp for ~1ms windows
74    static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
75}
76
77/// Get cached timestamp in microseconds
78///
79/// This function eliminates per-write `SystemTime::now()` syscalls by:
80/// 1. Caching the wall-clock timestamp in thread-local storage
81/// 2. Using monotonic `Instant` for sub-millisecond offsets
82/// 3. Only refreshing from syscall when cache expires (every ~1ms)
83///
84/// ## Performance Impact
85///
86/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
87/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
88///
89/// ## Monotonicity Guarantee
90///
91/// Timestamps are guaranteed to be monotonically increasing within a thread
92/// due to the `elapsed` offset added to the cached base timestamp.
93#[inline(always)]
94pub fn cached_timestamp_us() -> u64 {
95    TS_CACHE.with(|cache| {
96        let (instant, ts) = cache.get();
97        let elapsed_ns = instant.elapsed().as_nanos() as u64;
98
99        if elapsed_ns < CACHE_VALIDITY_NS {
100            // Fast path: return cached + monotonic offset
101            // This is pure arithmetic, no syscall
102            ts + elapsed_ns / 1000
103        } else {
104            // Slow path: refresh cache from syscall
105            let new_ts = SystemTime::now()
106                .duration_since(UNIX_EPOCH)
107                .expect("system clock set before UNIX epoch (1970-01-01)")
108                .as_micros() as u64;
109            cache.set((Instant::now(), new_ts));
110            new_ts
111        }
112    })
113}
114
115/// Header size in bytes (without key/value data)
116const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
117
118/// Checksum size (CRC32)
119const CHECKSUM_SIZE: usize = 4;
120
121/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
122const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
123
124// =============================================================================
125// At-rest encryption framing (Task 3B)
126// =============================================================================
127//
128// When the WAL's `EncryptionEngine` is enabled, each record is written as an
129// encrypted frame instead of the plaintext frame above:
130//
131// ```text
132//   plaintext frame:  [content_len:u32][type|txn|ts|klen|vlen|key|val|crc32]
133//   encrypted frame:  [outer_len:u32  ][ver|nonce(12)|ciphertext+tag(16)]
134// ```
135//
136// `outer_len` is the length of the AEAD envelope (ciphertext.len()); the inner
137// plaintext that the envelope protects is the record BODY — exactly the bytes
138// after `content_len` in the plaintext frame (`type..crc32`). On decrypt the
139// body is parsed by [`TxnWalEntry::parse_body`], reusing the same field layout +
140// CRC check as the plaintext path.
141//
142// The per-record AAD binds {format_version, db_uuid, dek_epoch, file-relative
143// record ordinal} so a record cannot be reordered, duplicated, spliced from
144// another DB, or read under a downgraded format without failing authentication.
145// The reader reconstructs the identical AAD from its own trusted state (the
146// keyring's db_uuid/epoch and a 0-based counter of records read from this file),
147// NEVER from the attacker-controllable on-disk bytes.
148
149/// Encode a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) — the bytes the
150/// AEAD envelope protects, identical to `to_bytes()` minus the length prefix.
151/// Used by the encrypted write paths to materialize a record before sealing it.
152fn encode_record_body(
153    record_type: WalRecordType,
154    txn_id: u64,
155    timestamp_us: u64,
156    key: &[u8],
157    value: &[u8],
158) -> Vec<u8> {
159    let body_len = (RECORD_HEADER_SIZE - 4) + key.len() + value.len() + CHECKSUM_SIZE;
160    let mut body = Vec::with_capacity(body_len);
161    let mut hasher = crc32fast::Hasher::new();
162    let rt = record_type as u8;
163    body.push(rt);
164    hasher.update(&[rt]);
165    let t = txn_id.to_le_bytes();
166    body.extend_from_slice(&t);
167    hasher.update(&t);
168    let ts = timestamp_us.to_le_bytes();
169    body.extend_from_slice(&ts);
170    hasher.update(&ts);
171    let kl = (key.len() as u32).to_le_bytes();
172    body.extend_from_slice(&kl);
173    hasher.update(&kl);
174    let vl = (value.len() as u32).to_le_bytes();
175    body.extend_from_slice(&vl);
176    hasher.update(&vl);
177    body.extend_from_slice(key);
178    hasher.update(key);
179    body.extend_from_slice(value);
180    hasher.update(value);
181    body.extend_from_slice(&hasher.finalize().to_le_bytes());
182    body
183}
184
185/// A Point-in-Time Recovery target (Task 3B PITR). See
186/// [`TxnWal::replay_to_target`] for the prefix semantics.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum RecoveryTarget {
189    /// Recover the first `lsn` WAL records (exact). Matches the value of
190    /// `DurableStorage::current_lsn()` captured at the desired point.
191    Lsn(u64),
192    /// Roll the WAL forward and STOP at the first transaction whose COMMIT
193    /// timestamp (microseconds since the epoch) exceeds this value — that
194    /// transaction and everything after it are excluded. This is an exact
195    /// WAL-order PREFIX, NOT a timestamp filter: if commit timestamps are
196    /// non-monotonic in WAL order (coarse ~1ms clock, NTP steps, cross-thread
197    /// group commit), a later-in-WAL transaction with `ts <= t` that follows an
198    /// excluded commit is ALSO excluded. Prefer `Lsn` for an exact, clock-
199    /// independent cut.
200    Timestamp(u64),
201}
202
203/// AAD layout version for the WAL record binding. Part of the on-disk contract.
204const WAL_AAD_VERSION: u8 = 1;
205/// AAD length: version(1) + db_uuid(16) + dek_epoch(4) + record_ordinal(8).
206const WAL_AAD_LEN: usize = 1 + 16 + 4 + 8;
207/// Upper bound on a single WAL frame's on-disk length, to reject a corrupted
208/// length prefix before it triggers a multi-GB `read_exact` allocation/DoS.
209const MAX_WAL_FRAME_LEN: u32 = 512 * 1024 * 1024;
210
211// =============================================================================
212// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
213// =============================================================================
214
215/// Transaction-local WAL write buffer
216///
217/// Collects all writes during a transaction in memory, then flushes
218/// everything with a SINGLE lock acquisition at commit time.
219///
220/// ## Performance Impact
221///
222/// ```text
223/// Without TxnWalBuffer (current):
224///   1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
225///   Lock overhead: 1000 × 20ns = 20µs per transaction
226///
227/// With TxnWalBuffer:
228///   1000 writes buffered locally (NO LOCK)
229///   1 flush at commit (SINGLE LOCK)
230///   Lock overhead: 1 × 20ns = 20ns per transaction
231///   Speedup: 1000× for lock overhead
232/// ```
233///
234/// ## Usage
235///
236/// ```ignore
237/// let mut buffer = TxnWalBuffer::new(txn_id);
238///
239/// // These do NOT acquire any locks
240/// buffer.append(b"key1", b"value1");
241/// buffer.append(b"key2", b"value2");
242/// // ... hundreds of writes ...
243///
244/// // Single lock acquisition, single write syscall
245/// buffer.flush(&wal)?;
246/// ```
247#[derive(Debug)]
248pub struct TxnWalBuffer {
249    /// Transaction ID for all entries
250    txn_id: u64,
251    /// Accumulated serialized entries
252    buffer: Vec<u8>,
253    /// Number of entries buffered
254    entry_count: usize,
255}
256
257impl TxnWalBuffer {
258    /// Create a new buffer for a transaction
259    #[inline]
260    pub fn new(txn_id: u64) -> Self {
261        Self {
262            txn_id,
263            buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
264            entry_count: 0,
265        }
266    }
267
268    /// Create with specific capacity
269    #[inline]
270    pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
271        Self {
272            txn_id,
273            buffer: Vec::with_capacity(capacity),
274            entry_count: 0,
275        }
276    }
277
278    /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
279    ///
280    /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
281    /// This is completely lock-free and does not touch the file system.
282    ///
283    /// Uses cached timestamps to eliminate per-write syscall overhead.
284    #[inline]
285    pub fn append(&mut self, key: &[u8], value: &[u8]) {
286        // Use cached timestamp instead of syscall (Recommendation 5)
287        let timestamp_us = cached_timestamp_us();
288
289        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
290        let entry_start = self.buffer.len();
291
292        // Reserve space for length prefix (will fill at end)
293        self.buffer.extend_from_slice(&[0u8; 4]);
294
295        let mut hasher = crc32fast::Hasher::new();
296
297        // Record type (Data = 0)
298        let record_type_byte = WalRecordType::Data as u8;
299        self.buffer.push(record_type_byte);
300        hasher.update(&[record_type_byte]);
301
302        // Transaction ID
303        let txn_bytes = self.txn_id.to_le_bytes();
304        self.buffer.extend_from_slice(&txn_bytes);
305        hasher.update(&txn_bytes);
306
307        // Timestamp
308        let ts_bytes = timestamp_us.to_le_bytes();
309        self.buffer.extend_from_slice(&ts_bytes);
310        hasher.update(&ts_bytes);
311
312        // Key length
313        let key_len_bytes = (key.len() as u32).to_le_bytes();
314        self.buffer.extend_from_slice(&key_len_bytes);
315        hasher.update(&key_len_bytes);
316
317        // Value length
318        let val_len_bytes = (value.len() as u32).to_le_bytes();
319        self.buffer.extend_from_slice(&val_len_bytes);
320        hasher.update(&val_len_bytes);
321
322        // Key data
323        self.buffer.extend_from_slice(key);
324        hasher.update(key);
325
326        // Value data
327        self.buffer.extend_from_slice(value);
328        hasher.update(value);
329
330        // CRC32 checksum
331        self.buffer
332            .extend_from_slice(&hasher.finalize().to_le_bytes());
333
334        // Fill in length prefix (content length, not including the 4-byte length field)
335        let content_len = (total_len - 4) as u32;
336        self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
337
338        self.entry_count += 1;
339    }
340
341    /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
342    ///
343    /// Acquires the WAL lock once and writes all accumulated entries.
344    /// Returns the first sequence number assigned to buffered entries.
345    ///
346    /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
347    /// as it properly handles the private writer field.
348    #[inline]
349    pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
350        wal.flush_buffer(self)
351    }
352
353    /// Clear the buffer (for reuse)
354    #[inline]
355    pub fn clear(&mut self) {
356        self.buffer.clear();
357        self.entry_count = 0;
358    }
359
360    /// Get number of buffered entries
361    #[inline]
362    pub fn entry_count(&self) -> usize {
363        self.entry_count
364    }
365
366    /// Get total bytes buffered
367    #[inline]
368    pub fn bytes_buffered(&self) -> usize {
369        self.buffer.len()
370    }
371
372    /// Check if buffer is empty
373    #[inline]
374    pub fn is_empty(&self) -> bool {
375        self.buffer.is_empty()
376    }
377}
378
379/// WAL entry for transaction-aware operations
380#[derive(Debug, Clone)]
381pub struct TxnWalEntry {
382    /// Record type
383    pub record_type: WalRecordType,
384    /// Transaction ID
385    pub txn_id: u64,
386    /// Timestamp in microseconds
387    pub timestamp_us: u64,
388    /// Key data
389    pub key: Vec<u8>,
390    /// Value data
391    pub value: Vec<u8>,
392}
393
394impl TxnWalEntry {
395    /// Create a new data entry
396    pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
397        Self {
398            record_type: WalRecordType::Data,
399            txn_id,
400            timestamp_us: Self::now_us(),
401            key,
402            value,
403        }
404    }
405
406    /// Create a transaction begin entry
407    pub fn txn_begin(txn_id: u64) -> Self {
408        Self {
409            record_type: WalRecordType::TxnBegin,
410            txn_id,
411            timestamp_us: Self::now_us(),
412            key: Vec::new(),
413            value: Vec::new(),
414        }
415    }
416
417    /// Create a transaction commit entry
418    pub fn txn_commit(txn_id: u64) -> Self {
419        Self {
420            record_type: WalRecordType::TxnCommit,
421            txn_id,
422            timestamp_us: Self::now_us(),
423            key: Vec::new(),
424            value: Vec::new(),
425        }
426    }
427
428    /// Create a transaction abort entry
429    pub fn txn_abort(txn_id: u64) -> Self {
430        Self {
431            record_type: WalRecordType::TxnAbort,
432            txn_id,
433            timestamp_us: Self::now_us(),
434            key: Vec::new(),
435            value: Vec::new(),
436        }
437    }
438
439    /// Create a checkpoint entry
440    pub fn checkpoint(txn_id: u64) -> Self {
441        Self {
442            record_type: WalRecordType::Checkpoint,
443            txn_id,
444            timestamp_us: Self::now_us(),
445            key: Vec::new(),
446            value: Vec::new(),
447        }
448    }
449
450    /// Create a schema change entry
451    pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
452        Self {
453            record_type: WalRecordType::SchemaChange,
454            txn_id,
455            timestamp_us: Self::now_us(),
456            key: Vec::new(),
457            value: schema_data,
458        }
459    }
460
461    /// Get current time in microseconds (uses cached timestamp)
462    #[inline]
463    fn now_us() -> u64 {
464        cached_timestamp_us()
465    }
466
467    /// Calculate CRC32 checksum for this entry
468    ///
469    /// Uses crc32fast for portable, deterministic checksums.
470    /// The checksum covers all fields except the checksum itself.
471    /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
472    pub fn checksum(&self) -> u32 {
473        let mut hasher = crc32fast::Hasher::new();
474        hasher.update(&[self.record_type as u8]);
475        hasher.update(&self.txn_id.to_le_bytes());
476        hasher.update(&self.timestamp_us.to_le_bytes());
477        hasher.update(&(self.key.len() as u32).to_le_bytes());
478        hasher.update(&(self.value.len() as u32).to_le_bytes());
479        hasher.update(&self.key);
480        hasher.update(&self.value);
481        hasher.finalize()
482    }
483
484    /// Serialize to bytes with single-pass CRC calculation
485    ///
486    /// Optimized to calculate CRC while building the buffer,
487    /// avoiding a second pass over all data.
488    pub fn to_bytes(&self) -> Vec<u8> {
489        let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
490        let mut buf = Vec::with_capacity(total_len);
491        let mut hasher = crc32fast::Hasher::new();
492
493        // Length (not including the length field itself) - not included in CRC
494        let content_len = (total_len - 4) as u32;
495        buf.extend_from_slice(&content_len.to_le_bytes());
496
497        // Record type
498        let record_type_byte = self.record_type as u8;
499        buf.push(record_type_byte);
500        hasher.update(&[record_type_byte]);
501
502        // Transaction ID
503        let txn_bytes = self.txn_id.to_le_bytes();
504        buf.extend_from_slice(&txn_bytes);
505        hasher.update(&txn_bytes);
506
507        // Timestamp
508        let ts_bytes = self.timestamp_us.to_le_bytes();
509        buf.extend_from_slice(&ts_bytes);
510        hasher.update(&ts_bytes);
511
512        // Key length
513        let key_len_bytes = (self.key.len() as u32).to_le_bytes();
514        buf.extend_from_slice(&key_len_bytes);
515        hasher.update(&key_len_bytes);
516
517        // Value length
518        let val_len_bytes = (self.value.len() as u32).to_le_bytes();
519        buf.extend_from_slice(&val_len_bytes);
520        hasher.update(&val_len_bytes);
521
522        // Key data
523        buf.extend_from_slice(&self.key);
524        hasher.update(&self.key);
525
526        // Value data
527        buf.extend_from_slice(&self.value);
528        hasher.update(&self.value);
529
530        // CRC32 Checksum (computed in single pass above)
531        buf.extend_from_slice(&hasher.finalize().to_le_bytes());
532
533        buf
534    }
535
536    /// Deserialize a PLAINTEXT WAL frame from a reader, with torn-write detection.
537    ///
538    /// This is the legacy / unencrypted reader. The live replay path on an
539    /// encrypted WAL does NOT use this — it uses [`TxnWal::read_record`], which
540    /// decrypts first and then calls [`Self::parse_body`]. Keeping this method
541    /// plaintext-only ensures a stray caller can never silently mis-parse
542    /// ciphertext as a plaintext frame.
543    ///
544    /// Returns error if:
545    /// - Length field indicates data is too short or implausibly large
546    /// - CRC32 checksum mismatch (corruption or torn write)
547    /// - Invalid record type
548    pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
549        // Length (allows torn write detection - if length claims more data than exists)
550        let content_len = reader.read_u32::<LittleEndian>()?;
551        if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
552            return Err(SochDBError::Corruption("WAL entry too short".into()));
553        }
554        if content_len > MAX_WAL_FRAME_LEN {
555            return Err(SochDBError::Corruption(format!(
556                "WAL entry length {content_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
557            )));
558        }
559
560        // Read the whole body, then parse. A short read here (torn tail) surfaces
561        // as Io(UnexpectedEof), which replay treats as a clean end-of-WAL.
562        let mut body = vec![0u8; content_len as usize];
563        reader.read_exact(&mut body)?;
564        Self::parse_body(&body)
565    }
566
567    /// Parse a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) and verify its
568    /// CRC. This is the bytes after the `content_len` prefix in a plaintext frame,
569    /// and is exactly what the AEAD envelope protects in an encrypted frame — so
570    /// both the plaintext and decrypt paths share this single parser.
571    pub fn parse_body(body: &[u8]) -> Result<Self> {
572        let mut cur = std::io::Cursor::new(body);
573
574        let record_type_byte = cur.read_u8()?;
575        let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
576            SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
577        })?;
578
579        let txn_id = cur.read_u64::<LittleEndian>()?;
580        let timestamp_us = cur.read_u64::<LittleEndian>()?;
581        let key_len = cur.read_u32::<LittleEndian>()? as usize;
582        let value_len = cur.read_u32::<LittleEndian>()? as usize;
583
584        let mut key = vec![0u8; key_len];
585        cur.read_exact(&mut key)?;
586        let mut value = vec![0u8; value_len];
587        cur.read_exact(&mut value)?;
588
589        let stored_checksum = cur.read_u32::<LittleEndian>()?;
590
591        let entry = Self {
592            record_type,
593            txn_id,
594            timestamp_us,
595            key,
596            value,
597        };
598
599        // Verify checksum - detects both corruption and torn writes
600        if entry.checksum() != stored_checksum {
601            return Err(SochDBError::Corruption(format!(
602                "WAL checksum mismatch for txn_id {}: expected {}, got {}",
603                txn_id,
604                entry.checksum(),
605                stored_checksum
606            )));
607        }
608
609        Ok(entry)
610    }
611
612    /// The record BODY bytes (`type..crc32`) — i.e. `to_bytes()` without the
613    /// leading 4-byte content_len. This is what gets fed to the AEAD on the
614    /// encrypted write path.
615    fn body_bytes(&self) -> Vec<u8> {
616        let full = self.to_bytes();
617        full[4..].to_vec()
618    }
619}
620
621/// Transaction-aware Write-Ahead Log
622pub struct TxnWal {
623    /// Path to WAL file
624    path: PathBuf,
625    /// Buffered writer
626    writer: Mutex<BufWriter<File>>,
627    /// Next transaction ID
628    next_txn_id: AtomicU64,
629    /// Write sequence number
630    sequence: AtomicU64,
631    /// Bytes written since last sync
632    bytes_since_sync: AtomicU64,
633    /// Cached timestamp (microseconds since epoch)
634    /// Updated periodically to avoid syscall per write
635    cached_timestamp_us: AtomicU64,
636    /// At-rest encryption engine. `disabled()` (identity passthrough) for a
637    /// plaintext WAL — in which case every write/read path is byte-identical to
638    /// the pre-3B format. When enabled, records are written/read as encrypted
639    /// frames (see the framing notes above).
640    encryption: Arc<EncryptionEngine>,
641    /// 16-byte database identity bound into each record's AAD (cross-DB splice
642    /// defense). All-zero / unused when encryption is disabled.
643    db_uuid: [u8; 16],
644    /// Active DEK epoch bound into each record's AAD (downgrade-across-rotation
645    /// defense). 0 / unused when encryption is disabled.
646    dek_epoch: u32,
647    /// File-relative count of records written to the CURRENT WAL file (reset on
648    /// truncate). Used as the per-record AAD ordinal so reorder/duplication
649    /// within a file fails authentication. All writes hold the `writer` lock, so
650    /// this counter is only mutated under that lock; it is read locklessly by the
651    /// (single-threaded) replay paths.
652    records_in_file: AtomicU64,
653}
654
655impl TxnWal {
656    /// Create a new plaintext WAL or open an existing one.
657    ///
658    /// Uses a disabled (passthrough) encryption engine, so the on-disk format is
659    /// byte-identical to the pre-encryption WAL. This is the constructor used by
660    /// every non-live caller (tests, tools); the live durable-storage path uses
661    /// [`Self::new_with_encryption`] to thread the keyring's engine in.
662    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
663        Self::new_with_encryption(path, Arc::new(EncryptionEngine::disabled()), [0u8; 16], 0)
664    }
665
666    /// Create/open a WAL with an explicit at-rest encryption engine.
667    ///
668    /// When `engine` is enabled, records are written and replayed as encrypted
669    /// frames bound to `db_uuid` + `dek_epoch` (see framing notes). When it is
670    /// disabled, this behaves exactly like [`Self::new`].
671    ///
672    /// `pub(crate)` by design: the engine + `db_uuid` + `dek_epoch` MUST come
673    /// from [`crate::keyring::load_or_init`] so the fail-closed open contract
674    /// (MAC + canary) and the AAD binding are enforced. The only public door to
675    /// an encrypted WAL is
676    /// [`crate::durable_storage::DurableStorage::open_with_encryption`].
677    pub(crate) fn new_with_encryption<P: AsRef<Path>>(
678        path: P,
679        encryption: Arc<EncryptionEngine>,
680        db_uuid: [u8; 16],
681        dek_epoch: u32,
682    ) -> Result<Self> {
683        let path = path.as_ref().to_path_buf();
684
685        // Ensure parent directory exists
686        if let Some(parent) = path.parent() {
687            std::fs::create_dir_all(parent)?;
688        }
689
690        let file = OpenOptions::new()
691            .create(true)
692            .append(true)
693            .read(true)
694            .open(&path)?;
695
696        // Use 256KB buffer for better batch performance (default is 8KB)
697        // This reduces system calls when buffering many small writes
698        let now_us = cached_timestamp_us();
699
700        let wal = Self {
701            path,
702            writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
703            next_txn_id: AtomicU64::new(1),
704            sequence: AtomicU64::new(0),
705            bytes_since_sync: AtomicU64::new(0),
706            cached_timestamp_us: AtomicU64::new(now_us),
707            encryption,
708            db_uuid,
709            dek_epoch,
710            records_in_file: AtomicU64::new(0),
711        };
712
713        // Recover state from existing WAL
714        wal.recover_state()?;
715
716        Ok(wal)
717    }
718
719    /// Build the per-record AAD for a given file-relative ordinal. The reader
720    /// reconstructs this from its own trusted state, never from on-disk bytes.
721    #[inline]
722    fn record_aad(&self, ordinal: u64) -> [u8; WAL_AAD_LEN] {
723        let mut aad = [0u8; WAL_AAD_LEN];
724        aad[0] = WAL_AAD_VERSION;
725        aad[1..17].copy_from_slice(&self.db_uuid);
726        aad[17..21].copy_from_slice(&self.dek_epoch.to_le_bytes());
727        aad[21..29].copy_from_slice(&ordinal.to_le_bytes());
728        aad
729    }
730
731    /// Frame a single record body for the encrypted write path:
732    /// `[outer_len:u32][version|nonce|ciphertext+tag]`, AAD-bound to `ordinal`.
733    #[inline]
734    fn encrypt_frame(&self, body: &[u8], ordinal: u64) -> Result<Vec<u8>> {
735        let env = self
736            .encryption
737            .encrypt_with_aad(body, &self.record_aad(ordinal))?;
738        let mut out = Vec::with_capacity(4 + env.len());
739        out.extend_from_slice(&(env.len() as u32).to_le_bytes());
740        out.extend_from_slice(&env);
741        Ok(out)
742    }
743
744    /// Read the next record from a replay reader, decrypting if the WAL is
745    /// encrypted. Returns the record and advances `*ordinal`.
746    ///
747    /// Error taxonomy (the linchpin of fail-loud recovery):
748    /// - `Io(UnexpectedEof)` reading the length prefix or a short body read at
749    ///   the physical tail ⇒ clean torn-tail / end-of-WAL (callers tolerate).
750    /// - `Encryption(_)` (AEAD auth failure / wrong key / bad envelope) or
751    ///   `Corruption(_)` (CRC / bad framing) ⇒ HARD error; callers MUST abort
752    ///   recovery rather than treat it as EOF.
753    fn read_record<R: Read>(&self, reader: &mut R, ordinal: &mut u64) -> Result<TxnWalEntry> {
754        if !self.encryption.is_enabled() {
755            let entry = TxnWalEntry::from_reader(reader)?;
756            *ordinal += 1;
757            return Ok(entry);
758        }
759        // Encrypted frame: [outer_len][envelope]
760        let outer_len = reader.read_u32::<LittleEndian>()?; // EOF here = clean end
761        if outer_len > MAX_WAL_FRAME_LEN {
762            return Err(SochDBError::Corruption(format!(
763                "encrypted WAL frame length {outer_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
764            )));
765        }
766        let mut env = vec![0u8; outer_len as usize];
767        reader.read_exact(&mut env)?; // short read = torn tail (UnexpectedEof)
768        // Decrypt with the AAD reconstructed for THIS ordinal. A wrong key, a
769        // reordered/spliced record, or tampering fails here as Encryption(_),
770        // which is a hard error — never a silent EOF.
771        let body = self
772            .encryption
773            .decrypt_with_aad(&env, &self.record_aad(*ordinal))?;
774        let entry = TxnWalEntry::parse_body(&body)?;
775        *ordinal += 1;
776        Ok(entry)
777    }
778
779    /// Recover state (next txn ID, sequence) from existing WAL
780    ///
781    /// To avoid txn_id collisions when multiple processes open the same
782    /// WAL concurrently, we incorporate the PID into the starting txn_id.
783    /// Format: upper 32 bits = PID, lower 32 bits = counter.
784    /// This guarantees uniqueness across processes without coordination.
785    fn recover_state(&self) -> Result<()> {
786        let file = File::open(&self.path)?;
787        let mut reader = BufReader::new(file);
788        let mut count: u64 = 0;
789
790        // Track the max counter (lower 32 bits) for OUR PID only.
791        // Each process owns its own txn_id space: upper 32 bits = PID.
792        // We must NOT use max_txn_id across ALL PIDs, because that would
793        // place us into another PID's ID space and cause collisions.
794        let our_pid = std::process::id() as u64;
795        let pid_base = our_pid << 32;
796        let mut max_our_counter: u64 = 0;
797        let mut ordinal: u64 = 0;
798
799        loop {
800            match self.read_record(&mut reader, &mut ordinal) {
801                Ok(entry) => {
802                    count += 1;
803                    // Only track counters from entries that belong to our PID
804                    let entry_pid = entry.txn_id >> 32;
805                    if entry_pid == our_pid {
806                        let entry_counter = entry.txn_id & 0xFFFF_FFFF;
807                        if entry_counter > max_our_counter {
808                            max_our_counter = entry_counter;
809                        }
810                    }
811                }
812                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
813                    break;
814                }
815                Err(e) => {
816                    // Encrypted: a non-EOF error (AEAD auth failure / tamper /
817                    // corruption) is NEVER a torn tail — fail loud rather than
818                    // silently truncate. Plaintext: tolerate trailing corruption
819                    // as an incomplete final write (legacy behavior).
820                    if self.encryption.is_enabled() {
821                        return Err(e);
822                    }
823                    break;
824                }
825            }
826        }
827
828        // Start from pid_base + (max counter we've seen for our PID + 1).
829        // This ensures:
830        //   1. Unique across processes (different PID → different upper 32 bits)
831        //   2. Unique within this process even if PID is recycled (we skip
832        //      over any counters already used by a previous process with our PID)
833        let next_id = pid_base + max_our_counter + 1;
834
835        self.next_txn_id.store(next_id, Ordering::SeqCst);
836        self.sequence.store(count, Ordering::SeqCst);
837        // The current file already holds `count` records, so the next encrypted
838        // write must continue the AAD ordinal sequence from there.
839        self.records_in_file.store(count, Ordering::SeqCst);
840
841        Ok(())
842    }
843
844    /// Get cached timestamp, updating if stale (>1ms old)
845    ///
846    /// This avoids a syscall per write by caching the timestamp.
847    /// For WAL purposes, ~1ms granularity is sufficient.
848    #[inline]
849    fn get_cached_timestamp(&self) -> u64 {
850        // Fast path: use cached value (no syscall)
851        let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
852
853        // Refresh occasionally (every ~1000 writes or when sequence wraps)
854        // This is a very cheap check since sequence is incremented atomically anyway
855        let seq = self.sequence.load(Ordering::Relaxed);
856        if seq & 0x3FF == 0 {
857            // Refresh every 1024 writes
858            let now_us = cached_timestamp_us();
859            self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
860            return now_us;
861        }
862
863        cached
864    }
865
866    /// Append an entry to the WAL
867    ///
868    /// Returns the sequence number of this write.
869    ///
870    /// # Durability Contract
871    ///
872    /// This method writes data to the BufWriter but does **NOT** fsync.
873    /// The data is NOT durable until `flush()` + `sync()` are called.
874    ///
875    /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
876    /// which enforce fsync. This method is intentionally non-durable for
877    /// batching data writes within a transaction (pre-commit records).
878    ///
879    /// The group commit path (`EventDrivenGroupCommit`) is responsible for
880    /// calling `flush()` + `sync()` after batching multiple commit records.
881    pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
882        let mut writer = self.writer.lock();
883        let bytes = self.frame_for_write(entry)?;
884
885        writer.write_all(&bytes)?;
886        // Don't flush here - BufWriter will batch writes automatically.
887        // Call flush() explicitly before sync() or commit().
888        self.commit_record_ordinal();
889
890        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
891        self.bytes_since_sync
892            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
893
894        Ok(seq)
895    }
896
897    /// Serialize one entry into its on-disk frame using the CURRENT (not-yet-
898    /// advanced) AAD ordinal. MUST be called with the `writer` lock held, and the
899    /// caller MUST call [`Self::commit_record_ordinal`] only AFTER `write_all`
900    /// succeeds — so a failed or partial write never advances the in-memory
901    /// ordinal past what is actually on disk (which would desync every
902    /// subsequent record's AAD from the reader's reconstructed ordinal).
903    #[inline]
904    fn frame_for_write(&self, entry: &TxnWalEntry) -> Result<Vec<u8>> {
905        if self.encryption.is_enabled() {
906            let ord = self.records_in_file.load(Ordering::SeqCst);
907            self.encrypt_frame(&entry.body_bytes(), ord)
908        } else {
909            Ok(entry.to_bytes())
910        }
911    }
912
913    /// Advance the file-relative AAD ordinal by one record. Call ONLY after the
914    /// record's bytes have been handed to the writer (post-`write_all`).
915    #[inline]
916    fn commit_record_ordinal(&self) {
917        if self.encryption.is_enabled() {
918            self.records_in_file.fetch_add(1, Ordering::SeqCst);
919        }
920    }
921
922    /// Append entry without flushing (for batched writes)
923    ///
924    /// Caller must call flush() or sync() afterward to ensure durability.
925    #[inline]
926    pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
927        let mut writer = self.writer.lock();
928        let bytes = self.frame_for_write(entry)?;
929
930        writer.write_all(&bytes)?;
931        // No flush - let BufWriter buffer the writes
932        self.commit_record_ordinal();
933
934        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
935        self.bytes_since_sync
936            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
937
938        Ok(seq)
939    }
940
941    /// Write data without flushing (for batched writes within a transaction)
942    #[inline]
943    pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
944        let entry = TxnWalEntry::data(txn_id, key, value);
945        self.append_no_flush(&entry)
946    }
947
948    /// Write data from slices without any allocation
949    ///
950    /// This is the fastest path for writing - no intermediate Vec allocations.
951    /// Calculates CRC32 while serializing directly to BufWriter.
952    #[inline]
953    pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
954        // Use coarse timestamp (cached every ~1ms) instead of syscall per write
955        let timestamp_us = self.get_cached_timestamp();
956
957        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
958
959        let mut writer = self.writer.lock();
960
961        // Encrypted mode cannot stream field-by-field (the AEAD needs the whole
962        // body to compute its synthetic IV + tag), so materialize the body into a
963        // scratch buffer, seal it, and write one framed envelope. The plaintext
964        // path below keeps its zero-alloc streaming fast path untouched.
965        if self.encryption.is_enabled() {
966            let body = encode_record_body(WalRecordType::Data, txn_id, timestamp_us, key, value);
967            let ord = self.records_in_file.load(Ordering::SeqCst);
968            let frame = self.encrypt_frame(&body, ord)?;
969            writer.write_all(&frame)?;
970            // Advance the AAD ordinal only after the bytes are committed to the
971            // writer, so a failed write never desyncs writer/reader ordinals.
972            self.records_in_file.fetch_add(1, Ordering::SeqCst);
973            let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
974            self.bytes_since_sync
975                .fetch_add(frame.len() as u64, Ordering::Relaxed);
976            return Ok(seq);
977        }
978
979        let mut hasher = crc32fast::Hasher::new();
980
981        // Length (not included in CRC)
982        let content_len = (total_len - 4) as u32;
983        writer.write_all(&content_len.to_le_bytes())?;
984
985        // Record type
986        let record_type_byte = WalRecordType::Data as u8;
987        writer.write_all(&[record_type_byte])?;
988        hasher.update(&[record_type_byte]);
989
990        // Transaction ID
991        let txn_bytes = txn_id.to_le_bytes();
992        writer.write_all(&txn_bytes)?;
993        hasher.update(&txn_bytes);
994
995        // Timestamp
996        let ts_bytes = timestamp_us.to_le_bytes();
997        writer.write_all(&ts_bytes)?;
998        hasher.update(&ts_bytes);
999
1000        // Key length
1001        let key_len_bytes = (key.len() as u32).to_le_bytes();
1002        writer.write_all(&key_len_bytes)?;
1003        hasher.update(&key_len_bytes);
1004
1005        // Value length
1006        let val_len_bytes = (value.len() as u32).to_le_bytes();
1007        writer.write_all(&val_len_bytes)?;
1008        hasher.update(&val_len_bytes);
1009
1010        // Key data
1011        writer.write_all(key)?;
1012        hasher.update(key);
1013
1014        // Value data
1015        writer.write_all(value)?;
1016        hasher.update(value);
1017
1018        // CRC32 checksum
1019        writer.write_all(&hasher.finalize().to_le_bytes())?;
1020
1021        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
1022        self.bytes_since_sync
1023            .fetch_add(total_len as u64, Ordering::Relaxed);
1024
1025        Ok(seq)
1026    }
1027
1028    /// Flush pending writes to kernel buffer (but not to disk)
1029    pub fn flush(&self) -> Result<()> {
1030        let mut writer = self.writer.lock();
1031        writer.flush()?;
1032        Ok(())
1033    }
1034
1035    /// Append and immediately sync for durability
1036    pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
1037        let seq = self.append(entry)?;
1038        self.sync()?;
1039        Ok(seq)
1040    }
1041
1042    /// Force sync to disk.
1043    ///
1044    /// Must flush the BufWriter BEFORE fsync: `get_ref()` reaches the raw File
1045    /// and bypasses the 256 KB buffer, so without an explicit `flush()` the
1046    /// just-appended commit/checkpoint record may still sit in userspace and
1047    /// `sync_all()` would fsync stale bytes — silently breaking the durability
1048    /// guarantee `append_sync`/`commit`/`checkpoint` advertise.
1049    pub fn sync(&self) -> Result<()> {
1050        let mut writer = self.writer.lock();
1051        writer.flush()?;
1052        writer.get_ref().sync_all()?;
1053        self.bytes_since_sync.store(0, Ordering::Relaxed);
1054        Ok(())
1055    }
1056
1057    /// Flush a TxnWalBuffer with single lock acquisition
1058    ///
1059    /// This is the high-performance path for transaction commit:
1060    /// - All writes during the transaction are buffered locally
1061    /// - At commit, this method flushes everything with ONE lock
1062    ///
1063    /// ## Performance
1064    ///
1065    /// ```text
1066    /// 1000 writes with individual flush: 1000 × lock overhead
1067    /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
1068    /// Speedup: ~1000× for lock overhead
1069    /// ```
1070    #[inline]
1071    pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
1072        if buffer.is_empty() {
1073            return Ok(0);
1074        }
1075
1076        let mut writer = self.writer.lock();
1077
1078        if self.encryption.is_enabled() {
1079            // The buffer holds concatenated plaintext frames
1080            // `[content_len][body]...`. Re-frame each as an AEAD envelope bound to
1081            // its own file-relative ordinal, preserving per-record framing (so a
1082            // torn tail discards exactly one record).
1083            let buf = &buffer.buffer;
1084            let mut pos = 0usize;
1085            let mut total_written = 0u64;
1086            while pos + 4 <= buf.len() {
1087                let content_len =
1088                    u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
1089                        as usize;
1090                pos += 4;
1091                if pos + content_len > buf.len() {
1092                    return Err(SochDBError::Corruption(
1093                        "txn buffer truncated mid-record during encrypted flush".into(),
1094                    ));
1095                }
1096                let body = &buf[pos..pos + content_len];
1097                pos += content_len;
1098                let ord = self.records_in_file.load(Ordering::SeqCst);
1099                let frame = self.encrypt_frame(body, ord)?;
1100                writer.write_all(&frame)?;
1101                // Advance per record only after its bytes are committed, so a
1102                // mid-batch write failure leaves records_in_file == records
1103                // actually written (no ordinal desync for the survivors).
1104                self.records_in_file.fetch_add(1, Ordering::SeqCst);
1105                total_written += frame.len() as u64;
1106            }
1107            let seq = self
1108                .sequence
1109                .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1110            self.bytes_since_sync
1111                .fetch_add(total_written, Ordering::Relaxed);
1112            return Ok(seq);
1113        }
1114
1115        writer.write_all(&buffer.buffer)?;
1116
1117        let seq = self
1118            .sequence
1119            .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1120        self.bytes_since_sync
1121            .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
1122
1123        Ok(seq)
1124    }
1125
1126    /// Get the current size of the WAL file in bytes
1127    pub fn size_bytes(&self) -> u64 {
1128        std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
1129    }
1130
1131    /// Allocate a new transaction ID
1132    pub fn alloc_txn_id(&self) -> u64 {
1133        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1134    }
1135
1136    /// Begin a new transaction
1137    pub fn begin_transaction(&self) -> Result<u64> {
1138        let txn_id = self.alloc_txn_id();
1139        let entry = TxnWalEntry::txn_begin(txn_id);
1140        self.append(&entry)?;
1141        Ok(txn_id)
1142    }
1143
1144    /// Commit a transaction (with fsync for durability)
1145    ///
1146    /// This flushes all pending writes and then fsyncs the commit record.
1147    ///
1148    /// # Durability Guarantee
1149    ///
1150    /// After this method returns `Ok(())`, the commit record is durable on disk.
1151    /// The transaction is guaranteed to survive process crash, OS crash, and
1152    /// power failure (assuming the storage device honors fsync correctly).
1153    ///
1154    /// # Performance
1155    ///
1156    /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
1157    /// For high-throughput workloads, use `EventDrivenGroupCommit` which
1158    /// batches multiple commits into a single fsync via `commit_durable_batch()`.
1159    pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
1160        // First flush any pending buffered writes
1161        self.flush()?;
1162
1163        // Then write commit record with fsync
1164        let entry = TxnWalEntry::txn_commit(txn_id);
1165        self.append_sync(&entry)?;
1166        Ok(())
1167    }
1168
1169    /// Commit a batch of transactions with a single fsync (group commit).
1170    ///
1171    /// Writes commit records for all transaction IDs, then performs a single
1172    /// flush + fsync. This amortizes the fsync cost across N transactions,
1173    /// achieving ~N× throughput improvement over individual commits.
1174    ///
1175    /// # Durability Guarantee
1176    ///
1177    /// After this method returns `Ok(())`, ALL transactions in the batch
1178    /// are durable on disk. Either all commit records are visible after
1179    /// crash recovery, or none are (atomic batch durability).
1180    ///
1181    /// # Usage
1182    ///
1183    /// This method is called by `EventDrivenGroupCommit::flush_fn` to
1184    /// implement the group commit pattern. Do not call directly unless
1185    /// you are implementing your own commit batching.
1186    pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
1187        // Write all commit records without flushing (batch them in BufWriter)
1188        for &txn_id in txn_ids {
1189            let entry = TxnWalEntry::txn_commit(txn_id);
1190            self.append_no_flush(&entry)?;
1191        }
1192
1193        // Single flush + fsync for the entire batch
1194        self.flush()?;
1195        self.sync()?;
1196        Ok(())
1197    }
1198
1199    /// Abort a transaction
1200    pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
1201        let entry = TxnWalEntry::txn_abort(txn_id);
1202        self.append(&entry)?;
1203        Ok(())
1204    }
1205
1206    /// Write data within a transaction
1207    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1208        let entry = TxnWalEntry::data(txn_id, key, value);
1209        self.append(&entry)
1210    }
1211
1212    /// Replay WAL for crash recovery
1213    ///
1214    /// Returns (committed_writes, recovered_txn_count)
1215    #[allow(clippy::type_complexity)]
1216    pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1217        let file = File::open(&self.path)?;
1218        let mut reader = BufReader::new(file);
1219
1220        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1221            std::collections::HashMap::new();
1222        let mut result = Vec::new();
1223        let mut txn_count = 0;
1224        let mut ordinal: u64 = 0;
1225
1226        // Single pass in WAL order. Transaction ids are process-local and
1227        // short-lived CLI processes can reuse the same ids after restart, so
1228        // grouping the entire WAL by txn_id would merge unrelated transactions
1229        // and replay older writes after newer ones.
1230        loop {
1231            match self.read_record(&mut reader, &mut ordinal) {
1232                Ok(entry) => match entry.record_type {
1233                    WalRecordType::TxnBegin => {
1234                        pending_writes.insert(entry.txn_id, Vec::new());
1235                    }
1236                    WalRecordType::Data => {
1237                        // Accept data for any txn_id we've seen a Begin for,
1238                        // and also for txn_ids without a Begin (they might have
1239                        // their Begin later in the WAL due to buffered writes).
1240                        pending_writes
1241                            .entry(entry.txn_id)
1242                            .or_insert_with(Vec::new)
1243                            .push((entry.key, entry.value));
1244                    }
1245                    WalRecordType::TxnCommit => {
1246                        if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1247                            result.extend(writes);
1248                            txn_count += 1;
1249                        }
1250                    }
1251                    WalRecordType::TxnAbort => {
1252                        pending_writes.remove(&entry.txn_id);
1253                    }
1254                    _ => {}
1255                },
1256                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1257                    break;
1258                }
1259                Err(e) => {
1260                    // Encrypted WALs must fail loud on a non-EOF error so a wrong
1261                    // key / tamper / corruption can never masquerade as EOF and
1262                    // silently drop committed data. (Wrong key is already excluded
1263                    // earlier by the keyring canary, so this is genuine corruption
1264                    // or tampering.) Plaintext keeps the legacy torn-tail tolerance.
1265                    if self.encryption.is_enabled() {
1266                        return Err(e);
1267                    }
1268                    break;
1269                }
1270            }
1271        }
1272
1273        Ok((result, txn_count))
1274    }
1275
1276    /// Replay the WAL up to a PITR target, returning the committed writes whose
1277    /// transactions are included by the target (and the count of such txns).
1278    ///
1279    /// Prefix semantics — recovery "rolls forward" through the WAL and STOPS at
1280    /// the target, so the result is always a state the database actually passed
1281    /// through:
1282    /// - [`RecoveryTarget::Lsn`]`(l)`: include the first `l` WAL records (exact;
1283    ///   `l` matches [`Self::sequence`] / `DurableStorage::current_lsn` captured
1284    ///   at that point). A transaction whose commit lands after record `l` is
1285    ///   excluded (atomic — partial transactions are never applied).
1286    /// - [`RecoveryTarget::Timestamp`]`(t)`: stop at the FIRST transaction whose
1287    ///   commit timestamp exceeds `t`. Best-effort on the coarse, possibly
1288    ///   non-monotonic commit clock; prefer `Lsn` for an exact cut.
1289    ///
1290    /// Crypto-aware and fail-loud, exactly like [`Self::replay_for_recovery`].
1291    #[allow(clippy::type_complexity)]
1292    pub fn replay_to_target(
1293        &self,
1294        target: RecoveryTarget,
1295    ) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1296        let file = File::open(&self.path)?;
1297        let mut reader = BufReader::new(file);
1298
1299        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1300            std::collections::HashMap::new();
1301        let mut result = Vec::new();
1302        let mut txn_count = 0;
1303        let mut ordinal: u64 = 0;
1304
1305        loop {
1306            match self.read_record(&mut reader, &mut ordinal) {
1307                Ok(entry) => {
1308                    // LSN target: `ordinal` is now this record's 1-based LSN.
1309                    // Stop once we pass the target (this record is excluded).
1310                    if let RecoveryTarget::Lsn(l) = target {
1311                        if ordinal > l {
1312                            break;
1313                        }
1314                    }
1315                    match entry.record_type {
1316                        WalRecordType::TxnBegin => {
1317                            pending_writes.insert(entry.txn_id, Vec::new());
1318                        }
1319                        WalRecordType::Data => {
1320                            pending_writes
1321                                .entry(entry.txn_id)
1322                                .or_insert_with(Vec::new)
1323                                .push((entry.key, entry.value));
1324                        }
1325                        WalRecordType::TxnCommit => {
1326                            // Timestamp target: stop rolling forward at the first
1327                            // transaction committed AFTER the target (exclude it).
1328                            if let RecoveryTarget::Timestamp(t) = target {
1329                                if entry.timestamp_us > t {
1330                                    break;
1331                                }
1332                            }
1333                            if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1334                                result.extend(writes);
1335                                txn_count += 1;
1336                            }
1337                        }
1338                        WalRecordType::TxnAbort => {
1339                            pending_writes.remove(&entry.txn_id);
1340                        }
1341                        _ => {}
1342                    }
1343                }
1344                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1345                    break;
1346                }
1347                Err(e) => {
1348                    if self.encryption.is_enabled() {
1349                        return Err(e);
1350                    }
1351                    break;
1352                }
1353            }
1354        }
1355
1356        Ok((result, txn_count))
1357    }
1358
1359    /// Replay WAL with a callback
1360    pub fn replay<F>(&self, mut callback: F) -> Result<u64>
1361    where
1362        F: FnMut(TxnWalEntry) -> Result<()>,
1363    {
1364        let file = File::open(&self.path)?;
1365        let mut reader = BufReader::new(file);
1366        let mut count = 0u64;
1367        let mut ordinal: u64 = 0;
1368
1369        loop {
1370            match self.read_record(&mut reader, &mut ordinal) {
1371                Ok(entry) => {
1372                    callback(entry)?;
1373                    count += 1;
1374                }
1375                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1376                    break;
1377                }
1378                Err(e) => {
1379                    // Encrypted: fail loud (never silently drop committed data on a
1380                    // non-EOF error). Plaintext: tolerate a trailing incomplete entry.
1381                    if self.encryption.is_enabled() {
1382                        return Err(e);
1383                    }
1384                    eprintln!("WAL replay warning: {:?}", e);
1385                    break;
1386                }
1387            }
1388        }
1389
1390        Ok(count)
1391    }
1392
1393    /// Truncate WAL (called after successful checkpoint)
1394    ///
1395    /// Flushes any buffered writes, truncates the file to 0 bytes,
1396    /// and resets sequence counters. The file is opened in `O_APPEND`
1397    /// mode so subsequent writes will correctly start at offset 0.
1398    ///
1399    /// **WARNING**: After truncation, all data durability is lost.
1400    /// The in-memory memtable still holds data for the current session,
1401    /// but a crash after truncation means the data cannot be recovered
1402    /// from the WAL.
1403    pub fn truncate(&self) -> Result<()> {
1404        let mut writer = self.writer.lock();
1405        // Flush BufWriter so no stale data is written after truncation
1406        writer.flush()?;
1407        let file = writer.get_ref();
1408        file.set_len(0)?;
1409        file.sync_all()?;
1410        self.sequence.store(0, Ordering::SeqCst);
1411        self.bytes_since_sync.store(0, Ordering::Relaxed);
1412        // The file restarts at offset 0, so per-record AAD ordinals restart too;
1413        // a fresh reader of the truncated file will count from 0 to match.
1414        self.records_in_file.store(0, Ordering::SeqCst);
1415        Ok(())
1416    }
1417
1418    /// Write a checkpoint marker
1419    pub fn write_checkpoint(&self) -> Result<u64> {
1420        let entry = TxnWalEntry::checkpoint(0);
1421        self.append_sync(&entry)
1422    }
1423
1424    /// Write a Compensation Log Record (CLR) for undo operations
1425    ///
1426    /// CLRs are redo-only records that skip past already undone operations
1427    /// during recovery. The undo_next_lsn field tells recovery where to
1428    /// continue undoing after this CLR.
1429    pub fn append_clr(
1430        &self,
1431        txn_id: u64,
1432        _original_lsn: u64,
1433        undo_next_lsn: Option<u64>,
1434        undo_data: &[u8],
1435    ) -> Result<u64> {
1436        // Encode undo_next_lsn into the key field
1437        let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1438        let entry = TxnWalEntry {
1439            record_type: WalRecordType::CompensationLogRecord,
1440            txn_id,
1441            timestamp_us: TxnWalEntry::now_us(),
1442            key, // undo_next_lsn encoded in key
1443            value: undo_data.to_vec(),
1444        };
1445        self.append(&entry)
1446    }
1447
1448    /// Write checkpoint with data (for fuzzy checkpoints)
1449    pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1450        let entry = TxnWalEntry {
1451            record_type: WalRecordType::Checkpoint,
1452            txn_id: 0,
1453            timestamp_us: TxnWalEntry::now_us(),
1454            key: Vec::new(),
1455            value: checkpoint_data.to_vec(),
1456        };
1457        self.append_sync(&entry)
1458    }
1459
1460    /// Write checkpoint end with captured state
1461    pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1462        let entry = TxnWalEntry {
1463            record_type: WalRecordType::CheckpointEnd,
1464            txn_id: 0,
1465            timestamp_us: TxnWalEntry::now_us(),
1466            key: Vec::new(),
1467            value: checkpoint_data.to_vec(),
1468        };
1469        self.append_sync(&entry)
1470    }
1471
1472    /// Get current sequence number
1473    pub fn sequence(&self) -> u64 {
1474        self.sequence.load(Ordering::SeqCst)
1475    }
1476
1477    /// Get bytes written since last sync
1478    pub fn bytes_since_sync(&self) -> u64 {
1479        self.bytes_since_sync.load(Ordering::Relaxed)
1480    }
1481
1482    /// Get path to WAL file
1483    pub fn path(&self) -> &Path {
1484        &self.path
1485    }
1486}
1487
1488/// Statistics about WAL state
1489#[derive(Debug, Clone, Default)]
1490pub struct TxnWalStats {
1491    /// Number of entries written
1492    pub entries_written: u64,
1493    /// Bytes written since last sync
1494    pub bytes_since_sync: u64,
1495    /// Current transaction ID counter
1496    pub next_txn_id: u64,
1497}
1498
1499// ============================================================================
1500// Sharded WAL for Reduced Mutex Contention
1501// ============================================================================
1502
1503/// Sharded Write-Ahead Log for high-concurrency workloads
1504///
1505/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1506/// - Writers hash to shard by txn_id
1507/// - Each shard has its own buffer
1508/// - Central coordinator handles fsync ordering
1509///
1510/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1511#[allow(dead_code)]
1512pub struct ShardedWal {
1513    /// Shard writers (txn_id % num_shards selects shard)
1514    shards: Vec<parking_lot::Mutex<WalShard>>,
1515    /// Number of shards (power of 2)
1516    num_shards: usize,
1517    /// Central WAL file for ordered writes
1518    central_writer: parking_lot::Mutex<BufWriter<File>>,
1519    /// Next transaction ID
1520    next_txn_id: AtomicU64,
1521    /// Write sequence (global ordering)
1522    sequence: AtomicU64,
1523    /// Path
1524    path: PathBuf,
1525}
1526
1527/// Individual WAL shard buffer
1528struct WalShard {
1529    /// Buffered entries for this shard
1530    buffer: Vec<u8>,
1531    /// Number of entries buffered
1532    entry_count: usize,
1533}
1534
1535impl WalShard {
1536    fn new() -> Self {
1537        Self {
1538            buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1539            entry_count: 0,
1540        }
1541    }
1542
1543    fn append(&mut self, entry: &TxnWalEntry) {
1544        let bytes = entry.to_bytes();
1545        self.buffer.extend_from_slice(&bytes);
1546        self.entry_count += 1;
1547    }
1548
1549    fn is_empty(&self) -> bool {
1550        self.buffer.is_empty()
1551    }
1552
1553    fn drain(&mut self) -> Vec<u8> {
1554        self.entry_count = 0;
1555        std::mem::take(&mut self.buffer)
1556    }
1557}
1558
1559impl ShardedWal {
1560    /// Create sharded WAL with specified number of shards
1561    ///
1562    /// Recommended: 4-8 shards for typical server workloads
1563    pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1564        let path = path.as_ref().to_path_buf();
1565
1566        if let Some(parent) = path.parent() {
1567            std::fs::create_dir_all(parent)?;
1568        }
1569
1570        let file = std::fs::OpenOptions::new()
1571            .create(true)
1572            .append(true)
1573            .read(true)
1574            .open(&path)?;
1575
1576        // Round up to power of 2 for fast modulo
1577        let num_shards = num_shards.next_power_of_two();
1578        let shards: Vec<_> = (0..num_shards)
1579            .map(|_| parking_lot::Mutex::new(WalShard::new()))
1580            .collect();
1581
1582        Ok(Self {
1583            shards,
1584            num_shards,
1585            central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1586            next_txn_id: AtomicU64::new(1),
1587            sequence: AtomicU64::new(0),
1588            path,
1589        })
1590    }
1591
1592    /// Get shard index for transaction
1593    #[inline]
1594    fn shard_idx(&self, txn_id: u64) -> usize {
1595        (txn_id as usize) & (self.num_shards - 1)
1596    }
1597
1598    /// Append entry to appropriate shard (lock-free for different txns)
1599    pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1600        let shard_idx = self.shard_idx(entry.txn_id);
1601        let mut shard = self.shards[shard_idx].lock();
1602        shard.append(entry);
1603        self.sequence.fetch_add(1, Ordering::SeqCst)
1604    }
1605
1606    /// Allocate transaction ID
1607    pub fn alloc_txn_id(&self) -> u64 {
1608        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1609    }
1610
1611    /// Flush all shard buffers to central file
1612    pub fn flush(&self) -> Result<()> {
1613        let mut central = self.central_writer.lock();
1614
1615        // Collect all shard buffers (brief lock per shard)
1616        for shard in &self.shards {
1617            let mut shard_guard = shard.lock();
1618            if !shard_guard.is_empty() {
1619                let data = shard_guard.drain();
1620                central.write_all(&data)?;
1621            }
1622        }
1623
1624        central.flush()?;
1625        Ok(())
1626    }
1627
1628    /// Sync to disk (fsync)
1629    pub fn sync(&self) -> Result<()> {
1630        self.flush()?;
1631        let central = self.central_writer.lock();
1632        central.get_ref().sync_all()?;
1633        Ok(())
1634    }
1635
1636    /// Begin transaction
1637    pub fn begin_transaction(&self) -> Result<u64> {
1638        let txn_id = self.alloc_txn_id();
1639        let entry = TxnWalEntry::txn_begin(txn_id);
1640        self.append(&entry);
1641        Ok(txn_id)
1642    }
1643
1644    /// Write data
1645    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1646        let entry = TxnWalEntry::data(txn_id, key, value);
1647        Ok(self.append(&entry))
1648    }
1649
1650    /// Commit transaction
1651    pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1652        let entry = TxnWalEntry::txn_commit(txn_id);
1653        let seq = self.append(&entry);
1654        self.sync()?; // Fsync on commit for durability
1655        Ok(seq)
1656    }
1657
1658    /// Get statistics
1659    pub fn stats(&self) -> ShardedWalStats {
1660        let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1661        for shard in &self.shards {
1662            shard_entry_counts.push(shard.lock().entry_count);
1663        }
1664
1665        ShardedWalStats {
1666            num_shards: self.num_shards,
1667            total_entries: self.sequence.load(Ordering::SeqCst),
1668            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1669            shard_entry_counts,
1670        }
1671    }
1672}
1673
1674/// Statistics for sharded WAL
1675#[derive(Debug, Clone)]
1676pub struct ShardedWalStats {
1677    pub num_shards: usize,
1678    pub total_entries: u64,
1679    pub next_txn_id: u64,
1680    pub shard_entry_counts: Vec<usize>,
1681}
1682
1683/// Detailed crash recovery statistics
1684#[derive(Debug, Clone, Default)]
1685pub struct CrashRecoveryStats {
1686    /// Total records read from WAL
1687    pub total_records: u64,
1688    /// Number of committed transactions
1689    pub committed_txns: u64,
1690    /// Number of uncommitted (rolled back) transactions
1691    pub rolled_back_txns: u64,
1692    /// Number of explicitly aborted transactions
1693    pub aborted_txns: u64,
1694    /// Number of data writes recovered
1695    pub recovered_writes: u64,
1696    /// Number of torn/corrupted records at end (expected on crash)
1697    pub torn_records: u64,
1698    /// Bytes read from WAL
1699    pub bytes_read: u64,
1700    /// Recovery duration in microseconds
1701    pub recovery_duration_us: u64,
1702    /// Highest transaction ID seen (for restarting counter)
1703    pub max_txn_id: u64,
1704}
1705
1706impl TxnWal {
1707    /// Get WAL statistics
1708    pub fn stats(&self) -> TxnWalStats {
1709        TxnWalStats {
1710            entries_written: self.sequence.load(Ordering::SeqCst),
1711            bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1712            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1713        }
1714    }
1715
1716    /// Full crash recovery with detailed statistics
1717    ///
1718    /// This method provides ACID recovery guarantees:
1719    /// 1. **Atomicity**: Uncommitted transactions are rolled back
1720    /// 2. **Durability**: All committed transactions are replayed
1721    /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1722    ///
1723    /// Returns (committed_writes, stats) where committed_writes contains
1724    /// all key-value pairs from committed transactions in order.
1725    #[allow(clippy::type_complexity)]
1726    pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1727        let start_time = std::time::Instant::now();
1728        let file = File::open(&self.path)?;
1729        let file_size = file.metadata()?.len();
1730        let mut reader = BufReader::new(file);
1731
1732        let mut stats = CrashRecoveryStats {
1733            bytes_read: file_size,
1734            ..Default::default()
1735        };
1736
1737        let mut committed_txns: HashSet<u64> = HashSet::new();
1738        let mut aborted_txns: HashSet<u64> = HashSet::new();
1739        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1740            std::collections::HashMap::new();
1741        let mut all_txns: HashSet<u64> = HashSet::new();
1742
1743        // Read all records, stopping at first corruption (torn write)
1744        let mut ordinal: u64 = 0;
1745        loop {
1746            match self.read_record(&mut reader, &mut ordinal) {
1747                Ok(entry) => {
1748                    stats.total_records += 1;
1749                    if entry.txn_id > stats.max_txn_id {
1750                        stats.max_txn_id = entry.txn_id;
1751                    }
1752
1753                    match entry.record_type {
1754                        WalRecordType::TxnBegin => {
1755                            pending_writes.insert(entry.txn_id, Vec::new());
1756                            all_txns.insert(entry.txn_id);
1757                        }
1758                        WalRecordType::Data => {
1759                            if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1760                                writes.push((entry.key, entry.value));
1761                            }
1762                        }
1763                        WalRecordType::TxnCommit => {
1764                            committed_txns.insert(entry.txn_id);
1765                        }
1766                        WalRecordType::TxnAbort => {
1767                            pending_writes.remove(&entry.txn_id);
1768                            aborted_txns.insert(entry.txn_id);
1769                        }
1770                        _ => {}
1771                    }
1772                }
1773                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1774                    // Clean EOF
1775                    break;
1776                }
1777                Err(e) => {
1778                    // Encrypted: an AEAD auth failure / tamper / corruption is a
1779                    // hard error — abort recovery rather than silently truncating
1780                    // committed data (wrong key is already caught by the keyring
1781                    // canary at open). Plaintext: treat trailing corruption as a
1782                    // torn write and stop.
1783                    if self.encryption.is_enabled() {
1784                        return Err(e);
1785                    }
1786                    stats.torn_records += 1;
1787                    break;
1788                }
1789            }
1790        }
1791
1792        // Collect committed writes
1793        let mut result = Vec::new();
1794        for (txn_id, writes) in &pending_writes {
1795            if committed_txns.contains(txn_id) {
1796                stats.committed_txns += 1;
1797                stats.recovered_writes += writes.len() as u64;
1798                result.extend(writes.clone());
1799            }
1800        }
1801
1802        // Count aborted and rolled-back transactions
1803        stats.aborted_txns = aborted_txns.len() as u64;
1804        stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1805
1806        stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1807
1808        Ok((result, stats))
1809    }
1810}
1811
1812#[cfg(test)]
1813mod tests {
1814    use super::*;
1815    use tempfile::tempdir;
1816
1817    #[test]
1818    fn test_wal_entry_roundtrip() {
1819        let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1820        let bytes = entry.to_bytes();
1821
1822        let mut cursor = std::io::Cursor::new(bytes);
1823        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1824
1825        assert_eq!(recovered.record_type, WalRecordType::Data);
1826        assert_eq!(recovered.txn_id, 42);
1827        assert_eq!(recovered.key, b"key");
1828        assert_eq!(recovered.value, b"value");
1829    }
1830
1831    #[test]
1832    fn test_wal_append_and_replay() {
1833        let dir = tempdir().unwrap();
1834        let wal_path = dir.path().join("test.wal");
1835
1836        // Write some entries
1837        {
1838            let wal = TxnWal::new(&wal_path).unwrap();
1839            let txn_id = wal.begin_transaction().unwrap();
1840            wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1841            wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1842            wal.commit_transaction(txn_id).unwrap();
1843        }
1844
1845        // Replay and verify
1846        {
1847            let wal = TxnWal::new(&wal_path).unwrap();
1848            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1849
1850            assert_eq!(txn_count, 1);
1851            assert_eq!(writes.len(), 2);
1852            assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1853            assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1854        }
1855    }
1856
1857    #[test]
1858    fn test_uncommitted_transaction_rollback() {
1859        let dir = tempdir().unwrap();
1860        let wal_path = dir.path().join("test.wal");
1861
1862        // Write committed and uncommitted transactions
1863        {
1864            let wal = TxnWal::new(&wal_path).unwrap();
1865
1866            // Committed transaction
1867            let txn1 = wal.begin_transaction().unwrap();
1868            wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1869                .unwrap();
1870            wal.commit_transaction(txn1).unwrap();
1871
1872            // Uncommitted transaction (simulates crash)
1873            let txn2 = wal.begin_transaction().unwrap();
1874            wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1875                .unwrap();
1876            // No commit!
1877        }
1878
1879        // Replay - uncommitted should be rolled back
1880        {
1881            let wal = TxnWal::new(&wal_path).unwrap();
1882            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1883
1884            assert_eq!(txn_count, 1); // Only committed transaction
1885            assert_eq!(writes.len(), 1);
1886            assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1887        }
1888    }
1889
1890    #[test]
1891    fn test_aborted_transaction() {
1892        let dir = tempdir().unwrap();
1893        let wal_path = dir.path().join("test.wal");
1894
1895        {
1896            let wal = TxnWal::new(&wal_path).unwrap();
1897
1898            let txn = wal.begin_transaction().unwrap();
1899            wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1900                .unwrap();
1901            wal.abort_transaction(txn).unwrap();
1902        }
1903
1904        {
1905            let wal = TxnWal::new(&wal_path).unwrap();
1906            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1907
1908            assert_eq!(txn_count, 0);
1909            assert!(writes.is_empty());
1910        }
1911    }
1912
1913    #[test]
1914    fn test_checksum_validation() {
1915        let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1916        let mut bytes = entry.to_bytes();
1917
1918        // Corrupt the checksum
1919        let len = bytes.len();
1920        bytes[len - 1] ^= 0xFF;
1921
1922        let mut cursor = std::io::Cursor::new(bytes);
1923        let result = TxnWalEntry::from_reader(&mut cursor);
1924
1925        assert!(result.is_err());
1926    }
1927
1928    #[test]
1929    fn test_crash_recovery_with_stats() {
1930        let dir = tempdir().unwrap();
1931        let wal_path = dir.path().join("test.wal");
1932
1933        // Simulate complex workload
1934        {
1935            let wal = TxnWal::new(&wal_path).unwrap();
1936
1937            // Committed transaction 1
1938            let txn1 = wal.begin_transaction().unwrap();
1939            wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1940            wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1941            wal.commit_transaction(txn1).unwrap();
1942
1943            // Aborted transaction
1944            let txn2 = wal.begin_transaction().unwrap();
1945            wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1946                .unwrap();
1947            wal.abort_transaction(txn2).unwrap();
1948
1949            // Committed transaction 2
1950            let txn3 = wal.begin_transaction().unwrap();
1951            wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1952            wal.commit_transaction(txn3).unwrap();
1953
1954            // Uncommitted transaction (simulates crash)
1955            let txn4 = wal.begin_transaction().unwrap();
1956            wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1957                .unwrap();
1958            // No commit - simulates crash
1959        }
1960
1961        // Recover and verify
1962        {
1963            let wal = TxnWal::new(&wal_path).unwrap();
1964            let (writes, stats) = wal.crash_recovery().unwrap();
1965
1966            // Should have 3 writes from 2 committed transactions
1967            assert_eq!(writes.len(), 3);
1968            assert_eq!(stats.committed_txns, 2);
1969            assert_eq!(stats.aborted_txns, 1);
1970            assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1971            assert_eq!(stats.recovered_writes, 3);
1972            assert!(stats.recovery_duration_us > 0);
1973        }
1974    }
1975
1976    #[test]
1977    fn test_torn_write_detection() {
1978        let dir = tempdir().unwrap();
1979        let wal_path = dir.path().join("test.wal");
1980
1981        // Write a valid transaction
1982        {
1983            let wal = TxnWal::new(&wal_path).unwrap();
1984            let txn = wal.begin_transaction().unwrap();
1985            wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1986            wal.commit_transaction(txn).unwrap();
1987        }
1988
1989        // Append corrupted bytes to simulate torn write
1990        {
1991            use std::io::Write;
1992            let mut file = std::fs::OpenOptions::new()
1993                .append(true)
1994                .open(&wal_path)
1995                .unwrap();
1996            // Write partial record (torn write)
1997            file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1998                .unwrap();
1999        }
2000
2001        // Recovery should still work, detecting torn write
2002        {
2003            let wal = TxnWal::new(&wal_path).unwrap();
2004            let (writes, stats) = wal.crash_recovery().unwrap();
2005
2006            // Should recover the valid transaction
2007            assert_eq!(writes.len(), 1);
2008            assert_eq!(stats.committed_txns, 1);
2009            assert_eq!(stats.torn_records, 1);
2010        }
2011    }
2012
2013    #[test]
2014    fn test_crc32_determinism() {
2015        // Verify CRC32 produces consistent checksums for same content
2016        let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
2017        entry1.timestamp_us = 12345; // Fixed timestamp for determinism
2018
2019        let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
2020        entry2.timestamp_us = 12345; // Same timestamp
2021
2022        assert_eq!(entry1.checksum(), entry2.checksum());
2023
2024        // Different content should produce different checksum
2025        let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
2026        entry3.timestamp_us = 12345;
2027        assert_ne!(entry1.checksum(), entry3.checksum());
2028
2029        // Verify roundtrip preserves checksum
2030        let bytes = entry1.to_bytes();
2031        let mut cursor = std::io::Cursor::new(bytes);
2032        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
2033        assert_eq!(recovered.checksum(), entry1.checksum());
2034    }
2035}
2036
2037#[cfg(test)]
2038mod encryption_wal_tests {
2039    use super::*;
2040    use crate::encryption::EncryptionEngine;
2041    use tempfile::tempdir;
2042
2043    fn enc(key: u8) -> Arc<EncryptionEngine> {
2044        Arc::new(EncryptionEngine::new(&[key; 32]))
2045    }
2046    const UUID: [u8; 16] = [9u8; 16];
2047
2048    /// Write a committed txn (begin/data/commit) and one aborted txn via the
2049    /// encrypted paths, then reopen with the same key and crash-recover.
2050    #[test]
2051    fn encrypted_write_then_recover_roundtrip() {
2052        let dir = tempdir().unwrap();
2053        let path = dir.path().join("enc.wal");
2054
2055        {
2056            let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
2057            // committed txn via append + write_no_flush_refs + flush_buffer paths
2058            let t1 = wal.begin_transaction().unwrap();
2059            wal.write(t1, b"alpha".to_vec(), b"one".to_vec()).unwrap();
2060            wal.write_no_flush_refs(t1, b"beta", b"two").unwrap();
2061            // also exercise the buffer/flush_buffer batch path
2062            let mut buf = TxnWalBuffer::new(t1);
2063            buf.append(b"gamma", b"three");
2064            wal.flush_buffer(&buf).unwrap();
2065            wal.commit_transaction(t1).unwrap();
2066
2067            // aborted txn must NOT survive
2068            let t2 = wal.begin_transaction().unwrap();
2069            wal.write(t2, b"ghost".to_vec(), b"x".to_vec()).unwrap();
2070            wal.abort_transaction(t2).unwrap();
2071            wal.sync().unwrap();
2072        }
2073
2074        // On-disk bytes must NOT contain the plaintext values.
2075        let raw = std::fs::read(&path).unwrap();
2076        assert!(!contains(&raw, b"alpha"));
2077        assert!(!contains(&raw, b"three"));
2078
2079        let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
2080        let (writes, stats) = wal.crash_recovery().unwrap();
2081        let keys: Vec<_> = writes.iter().map(|(k, _)| k.clone()).collect();
2082        assert!(keys.contains(&b"alpha".to_vec()));
2083        assert!(keys.contains(&b"beta".to_vec()));
2084        assert!(keys.contains(&b"gamma".to_vec()));
2085        assert!(!keys.contains(&b"ghost".to_vec()), "aborted txn leaked");
2086        assert_eq!(stats.committed_txns, 1);
2087    }
2088
2089    /// Wrong key at the WAL layer must FAIL LOUD, not silently return empty.
2090    /// (In production the keyring canary catches this even earlier; this proves
2091    /// the replay path itself never swallows an AEAD failure as EOF.)
2092    #[test]
2093    fn wrong_key_fails_loud_not_empty() {
2094        let dir = tempdir().unwrap();
2095        let path = dir.path().join("enc.wal");
2096        {
2097            let wal = TxnWal::new_with_encryption(&path, enc(1), UUID, 0).unwrap();
2098            let t = wal.begin_transaction().unwrap();
2099            wal.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
2100            wal.commit_transaction(t).unwrap();
2101            wal.sync().unwrap();
2102        }
2103        // Reopen with the WRONG key: recover_state runs in the constructor and
2104        // must surface a hard error rather than an "empty" success.
2105        let opened = TxnWal::new_with_encryption(&path, enc(2), UUID, 0);
2106        assert!(opened.is_err(), "wrong key opened silently (data-loss bug)");
2107        match opened.err().unwrap() {
2108            SochDBError::Encryption(_) => {}
2109            other => panic!("expected Encryption error, got {other:?}"),
2110        }
2111    }
2112
2113    /// Tampering with a committed encrypted record must fail recovery loud.
2114    #[test]
2115    fn tamper_midstream_fails_loud() {
2116        let dir = tempdir().unwrap();
2117        let path = dir.path().join("enc.wal");
2118        {
2119            let wal = TxnWal::new_with_encryption(&path, enc(5), UUID, 0).unwrap();
2120            let t = wal.begin_transaction().unwrap();
2121            wal.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
2122            wal.commit_transaction(t).unwrap();
2123            wal.sync().unwrap();
2124        }
2125        // Flip a byte well inside the file (not the final torn-tail region).
2126        let mut raw = std::fs::read(&path).unwrap();
2127        let mid = raw.len() / 2;
2128        raw[mid] ^= 0xFF;
2129        std::fs::write(&path, &raw).unwrap();
2130
2131        let opened = TxnWal::new_with_encryption(&path, enc(5), UUID, 0);
2132        // Either the constructor's recover_state errors, or a later crash_recovery
2133        // does — but it must NEVER silently accept tampered ciphertext.
2134        let failed = opened.is_err()
2135            || opened
2136                .ok()
2137                .map(|w| w.crash_recovery().is_err())
2138                .unwrap_or(true);
2139        assert!(failed, "tampered encrypted WAL was silently accepted");
2140    }
2141
2142    /// The disabled engine must write a record's bytes EXACTLY as the legacy
2143    /// `to_bytes()` frame (no added encryption framing / format drift), while the
2144    /// enabled engine must NOT (it adds the AEAD envelope and is unreadable by the
2145    /// plaintext reader). Records embed wall-clock timestamps, so we compare a
2146    /// single fixed entry object against its own `to_bytes()` for determinism.
2147    #[test]
2148    fn disabled_engine_is_byte_identical_to_plaintext() {
2149        let dir = tempdir().unwrap();
2150
2151        let entry = TxnWalEntry::data(42, b"k1".to_vec(), b"v1".to_vec());
2152        let golden = entry.to_bytes();
2153
2154        // Disabled engine: file bytes == to_bytes() exactly.
2155        let p_dis = dir.path().join("disabled.wal");
2156        {
2157            let wal = TxnWal::new_with_encryption(
2158                &p_dis,
2159                Arc::new(EncryptionEngine::disabled()),
2160                [0u8; 16],
2161                0,
2162            )
2163            .unwrap();
2164            wal.append(&entry).unwrap();
2165            wal.sync().unwrap();
2166        }
2167        assert_eq!(
2168            std::fs::read(&p_dis).unwrap(),
2169            golden,
2170            "disabled-engine append diverged from legacy plaintext frame"
2171        );
2172
2173        // Enabled engine: file bytes differ and are NOT plaintext-parseable.
2174        let p_enc = dir.path().join("enc.wal");
2175        {
2176            let wal = TxnWal::new_with_encryption(&p_enc, enc(3), UUID, 0).unwrap();
2177            wal.append(&entry).unwrap();
2178            wal.sync().unwrap();
2179        }
2180        let enc_bytes = std::fs::read(&p_enc).unwrap();
2181        assert_ne!(enc_bytes, golden);
2182        let mut cur = std::io::Cursor::new(&enc_bytes);
2183        assert!(
2184            TxnWalEntry::from_reader(&mut cur).is_err()
2185                || cur.position() as usize != enc_bytes.len(),
2186            "ciphertext frame must not parse cleanly as a plaintext record"
2187        );
2188    }
2189
2190    fn contains(haystack: &[u8], needle: &[u8]) -> bool {
2191        haystack.windows(needle.len()).any(|w| w == needle)
2192    }
2193}