Skip to main content

sochdb_storage/
txn_wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1)  │ TxnId(8)│ Timestamp │ KeyLen  │ ValueLen  │ Key(*)  │ Value(*)│ CRC32(4) │
40//! │           │          │         │   (8)     │  (4)    │   (4)     │         │         │          │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use byteorder::{LittleEndian, ReadBytesExt};
52use parking_lot::Mutex;
53use sochdb_core::{Result, SochDBError, WalRecordType};
54use std::cell::Cell;
55use std::collections::HashSet;
56use std::fs::{File, OpenOptions};
57use std::io::{BufReader, BufWriter, Read, Write};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU64, Ordering};
60use std::time::{Instant, SystemTime, UNIX_EPOCH};
61
62// =============================================================================
63// Coarse-Grained Timestamp Caching (Recommendation 5)
64// =============================================================================
65
66/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
67const CACHE_VALIDITY_NS: u64 = 1_000_000;
68
69thread_local! {
70    /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
71    /// Avoids syscall overhead by caching timestamp for ~1ms windows
72    static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
73}
74
75/// Get cached timestamp in microseconds
76///
77/// This function eliminates per-write `SystemTime::now()` syscalls by:
78/// 1. Caching the wall-clock timestamp in thread-local storage
79/// 2. Using monotonic `Instant` for sub-millisecond offsets
80/// 3. Only refreshing from syscall when cache expires (every ~1ms)
81///
82/// ## Performance Impact
83///
84/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
85/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
86///
87/// ## Monotonicity Guarantee
88///
89/// Timestamps are guaranteed to be monotonically increasing within a thread
90/// due to the `elapsed` offset added to the cached base timestamp.
91#[inline(always)]
92pub fn cached_timestamp_us() -> u64 {
93    TS_CACHE.with(|cache| {
94        let (instant, ts) = cache.get();
95        let elapsed_ns = instant.elapsed().as_nanos() as u64;
96
97        if elapsed_ns < CACHE_VALIDITY_NS {
98            // Fast path: return cached + monotonic offset
99            // This is pure arithmetic, no syscall
100            ts + elapsed_ns / 1000
101        } else {
102            // Slow path: refresh cache from syscall
103            let new_ts = SystemTime::now()
104                .duration_since(UNIX_EPOCH)
105                .expect("system clock set before UNIX epoch (1970-01-01)")
106                .as_micros() as u64;
107            cache.set((Instant::now(), new_ts));
108            new_ts
109        }
110    })
111}
112
113/// Header size in bytes (without key/value data)
114const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
115
116/// Checksum size (CRC32)
117const CHECKSUM_SIZE: usize = 4;
118
119/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
120const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
121
122// =============================================================================
123// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
124// =============================================================================
125
126/// Transaction-local WAL write buffer
127///
128/// Collects all writes during a transaction in memory, then flushes
129/// everything with a SINGLE lock acquisition at commit time.
130///
131/// ## Performance Impact
132///
133/// ```text
134/// Without TxnWalBuffer (current):
135///   1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
136///   Lock overhead: 1000 × 20ns = 20µs per transaction
137///
138/// With TxnWalBuffer:
139///   1000 writes buffered locally (NO LOCK)
140///   1 flush at commit (SINGLE LOCK)
141///   Lock overhead: 1 × 20ns = 20ns per transaction
142///   Speedup: 1000× for lock overhead
143/// ```
144///
145/// ## Usage
146///
147/// ```ignore
148/// let mut buffer = TxnWalBuffer::new(txn_id);
149///
150/// // These do NOT acquire any locks
151/// buffer.append(b"key1", b"value1");
152/// buffer.append(b"key2", b"value2");
153/// // ... hundreds of writes ...
154///
155/// // Single lock acquisition, single write syscall
156/// buffer.flush(&wal)?;
157/// ```
158#[derive(Debug)]
159pub struct TxnWalBuffer {
160    /// Transaction ID for all entries
161    txn_id: u64,
162    /// Accumulated serialized entries
163    buffer: Vec<u8>,
164    /// Number of entries buffered
165    entry_count: usize,
166}
167
168impl TxnWalBuffer {
169    /// Create a new buffer for a transaction
170    #[inline]
171    pub fn new(txn_id: u64) -> Self {
172        Self {
173            txn_id,
174            buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
175            entry_count: 0,
176        }
177    }
178
179    /// Create with specific capacity
180    #[inline]
181    pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
182        Self {
183            txn_id,
184            buffer: Vec::with_capacity(capacity),
185            entry_count: 0,
186        }
187    }
188
189    /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
190    ///
191    /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
192    /// This is completely lock-free and does not touch the file system.
193    ///
194    /// Uses cached timestamps to eliminate per-write syscall overhead.
195    #[inline]
196    pub fn append(&mut self, key: &[u8], value: &[u8]) {
197        // Use cached timestamp instead of syscall (Recommendation 5)
198        let timestamp_us = cached_timestamp_us();
199
200        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
201        let entry_start = self.buffer.len();
202
203        // Reserve space for length prefix (will fill at end)
204        self.buffer.extend_from_slice(&[0u8; 4]);
205
206        let mut hasher = crc32fast::Hasher::new();
207
208        // Record type (Data = 0)
209        let record_type_byte = WalRecordType::Data as u8;
210        self.buffer.push(record_type_byte);
211        hasher.update(&[record_type_byte]);
212
213        // Transaction ID
214        let txn_bytes = self.txn_id.to_le_bytes();
215        self.buffer.extend_from_slice(&txn_bytes);
216        hasher.update(&txn_bytes);
217
218        // Timestamp
219        let ts_bytes = timestamp_us.to_le_bytes();
220        self.buffer.extend_from_slice(&ts_bytes);
221        hasher.update(&ts_bytes);
222
223        // Key length
224        let key_len_bytes = (key.len() as u32).to_le_bytes();
225        self.buffer.extend_from_slice(&key_len_bytes);
226        hasher.update(&key_len_bytes);
227
228        // Value length
229        let val_len_bytes = (value.len() as u32).to_le_bytes();
230        self.buffer.extend_from_slice(&val_len_bytes);
231        hasher.update(&val_len_bytes);
232
233        // Key data
234        self.buffer.extend_from_slice(key);
235        hasher.update(key);
236
237        // Value data
238        self.buffer.extend_from_slice(value);
239        hasher.update(value);
240
241        // CRC32 checksum
242        self.buffer
243            .extend_from_slice(&hasher.finalize().to_le_bytes());
244
245        // Fill in length prefix (content length, not including the 4-byte length field)
246        let content_len = (total_len - 4) as u32;
247        self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
248
249        self.entry_count += 1;
250    }
251
252    /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
253    ///
254    /// Acquires the WAL lock once and writes all accumulated entries.
255    /// Returns the first sequence number assigned to buffered entries.
256    ///
257    /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
258    /// as it properly handles the private writer field.
259    #[inline]
260    pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
261        wal.flush_buffer(self)
262    }
263
264    /// Clear the buffer (for reuse)
265    #[inline]
266    pub fn clear(&mut self) {
267        self.buffer.clear();
268        self.entry_count = 0;
269    }
270
271    /// Get number of buffered entries
272    #[inline]
273    pub fn entry_count(&self) -> usize {
274        self.entry_count
275    }
276
277    /// Get total bytes buffered
278    #[inline]
279    pub fn bytes_buffered(&self) -> usize {
280        self.buffer.len()
281    }
282
283    /// Check if buffer is empty
284    #[inline]
285    pub fn is_empty(&self) -> bool {
286        self.buffer.is_empty()
287    }
288}
289
290/// WAL entry for transaction-aware operations
291#[derive(Debug, Clone)]
292pub struct TxnWalEntry {
293    /// Record type
294    pub record_type: WalRecordType,
295    /// Transaction ID
296    pub txn_id: u64,
297    /// Timestamp in microseconds
298    pub timestamp_us: u64,
299    /// Key data
300    pub key: Vec<u8>,
301    /// Value data
302    pub value: Vec<u8>,
303}
304
305impl TxnWalEntry {
306    /// Create a new data entry
307    pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
308        Self {
309            record_type: WalRecordType::Data,
310            txn_id,
311            timestamp_us: Self::now_us(),
312            key,
313            value,
314        }
315    }
316
317    /// Create a transaction begin entry
318    pub fn txn_begin(txn_id: u64) -> Self {
319        Self {
320            record_type: WalRecordType::TxnBegin,
321            txn_id,
322            timestamp_us: Self::now_us(),
323            key: Vec::new(),
324            value: Vec::new(),
325        }
326    }
327
328    /// Create a transaction commit entry
329    pub fn txn_commit(txn_id: u64) -> Self {
330        Self {
331            record_type: WalRecordType::TxnCommit,
332            txn_id,
333            timestamp_us: Self::now_us(),
334            key: Vec::new(),
335            value: Vec::new(),
336        }
337    }
338
339    /// Create a transaction abort entry
340    pub fn txn_abort(txn_id: u64) -> Self {
341        Self {
342            record_type: WalRecordType::TxnAbort,
343            txn_id,
344            timestamp_us: Self::now_us(),
345            key: Vec::new(),
346            value: Vec::new(),
347        }
348    }
349
350    /// Create a checkpoint entry
351    pub fn checkpoint(txn_id: u64) -> Self {
352        Self {
353            record_type: WalRecordType::Checkpoint,
354            txn_id,
355            timestamp_us: Self::now_us(),
356            key: Vec::new(),
357            value: Vec::new(),
358        }
359    }
360
361    /// Create a schema change entry
362    pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
363        Self {
364            record_type: WalRecordType::SchemaChange,
365            txn_id,
366            timestamp_us: Self::now_us(),
367            key: Vec::new(),
368            value: schema_data,
369        }
370    }
371
372    /// Get current time in microseconds (uses cached timestamp)
373    #[inline]
374    fn now_us() -> u64 {
375        cached_timestamp_us()
376    }
377
378    /// Calculate CRC32 checksum for this entry
379    ///
380    /// Uses crc32fast for portable, deterministic checksums.
381    /// The checksum covers all fields except the checksum itself.
382    /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
383    pub fn checksum(&self) -> u32 {
384        let mut hasher = crc32fast::Hasher::new();
385        hasher.update(&[self.record_type as u8]);
386        hasher.update(&self.txn_id.to_le_bytes());
387        hasher.update(&self.timestamp_us.to_le_bytes());
388        hasher.update(&(self.key.len() as u32).to_le_bytes());
389        hasher.update(&(self.value.len() as u32).to_le_bytes());
390        hasher.update(&self.key);
391        hasher.update(&self.value);
392        hasher.finalize()
393    }
394
395    /// Serialize to bytes with single-pass CRC calculation
396    ///
397    /// Optimized to calculate CRC while building the buffer,
398    /// avoiding a second pass over all data.
399    pub fn to_bytes(&self) -> Vec<u8> {
400        let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
401        let mut buf = Vec::with_capacity(total_len);
402        let mut hasher = crc32fast::Hasher::new();
403
404        // Length (not including the length field itself) - not included in CRC
405        let content_len = (total_len - 4) as u32;
406        buf.extend_from_slice(&content_len.to_le_bytes());
407
408        // Record type
409        let record_type_byte = self.record_type as u8;
410        buf.push(record_type_byte);
411        hasher.update(&[record_type_byte]);
412
413        // Transaction ID
414        let txn_bytes = self.txn_id.to_le_bytes();
415        buf.extend_from_slice(&txn_bytes);
416        hasher.update(&txn_bytes);
417
418        // Timestamp
419        let ts_bytes = self.timestamp_us.to_le_bytes();
420        buf.extend_from_slice(&ts_bytes);
421        hasher.update(&ts_bytes);
422
423        // Key length
424        let key_len_bytes = (self.key.len() as u32).to_le_bytes();
425        buf.extend_from_slice(&key_len_bytes);
426        hasher.update(&key_len_bytes);
427
428        // Value length
429        let val_len_bytes = (self.value.len() as u32).to_le_bytes();
430        buf.extend_from_slice(&val_len_bytes);
431        hasher.update(&val_len_bytes);
432
433        // Key data
434        buf.extend_from_slice(&self.key);
435        hasher.update(&self.key);
436
437        // Value data
438        buf.extend_from_slice(&self.value);
439        hasher.update(&self.value);
440
441        // CRC32 Checksum (computed in single pass above)
442        buf.extend_from_slice(&hasher.finalize().to_le_bytes());
443
444        buf
445    }
446
447    /// Deserialize from bytes with torn write detection
448    ///
449    /// Returns error if:
450    /// - Length field indicates data is too short
451    /// - CRC32 checksum mismatch (corruption or torn write)
452    /// - Invalid record type
453    pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
454        // Length (allows torn write detection - if length claims more data than exists)
455        let content_len = reader.read_u32::<LittleEndian>()?;
456        if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
457            return Err(SochDBError::Corruption("WAL entry too short".into()));
458        }
459
460        // Record type
461        let record_type_byte = reader.read_u8()?;
462        let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
463            SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
464        })?;
465
466        // Transaction ID
467        let txn_id = reader.read_u64::<LittleEndian>()?;
468
469        // Timestamp
470        let timestamp_us = reader.read_u64::<LittleEndian>()?;
471
472        // Key length
473        let key_len = reader.read_u32::<LittleEndian>()? as usize;
474
475        // Value length
476        let value_len = reader.read_u32::<LittleEndian>()? as usize;
477
478        // Key data
479        let mut key = vec![0u8; key_len];
480        reader.read_exact(&mut key)?;
481
482        // Value data
483        let mut value = vec![0u8; value_len];
484        reader.read_exact(&mut value)?;
485
486        // CRC32 Checksum
487        let stored_checksum = reader.read_u32::<LittleEndian>()?;
488
489        let entry = Self {
490            record_type,
491            txn_id,
492            timestamp_us,
493            key,
494            value,
495        };
496
497        // Verify checksum - detects both corruption and torn writes
498        if entry.checksum() != stored_checksum {
499            return Err(SochDBError::Corruption(format!(
500                "WAL checksum mismatch for txn_id {}: expected {}, got {}",
501                txn_id,
502                entry.checksum(),
503                stored_checksum
504            )));
505        }
506
507        Ok(entry)
508    }
509}
510
511/// Transaction-aware Write-Ahead Log
512pub struct TxnWal {
513    /// Path to WAL file
514    path: PathBuf,
515    /// Buffered writer
516    writer: Mutex<BufWriter<File>>,
517    /// Next transaction ID
518    next_txn_id: AtomicU64,
519    /// Write sequence number
520    sequence: AtomicU64,
521    /// Bytes written since last sync
522    bytes_since_sync: AtomicU64,
523    /// Cached timestamp (microseconds since epoch)
524    /// Updated periodically to avoid syscall per write
525    cached_timestamp_us: AtomicU64,
526}
527
528impl TxnWal {
529    /// Create a new WAL or open existing one
530    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
531        let path = path.as_ref().to_path_buf();
532
533        // Ensure parent directory exists
534        if let Some(parent) = path.parent() {
535            std::fs::create_dir_all(parent)?;
536        }
537
538        let file = OpenOptions::new()
539            .create(true)
540            .append(true)
541            .read(true)
542            .open(&path)?;
543
544        // Use 256KB buffer for better batch performance (default is 8KB)
545        // This reduces system calls when buffering many small writes
546        let now_us = cached_timestamp_us();
547
548        let wal = Self {
549            path,
550            writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
551            next_txn_id: AtomicU64::new(1),
552            sequence: AtomicU64::new(0),
553            bytes_since_sync: AtomicU64::new(0),
554            cached_timestamp_us: AtomicU64::new(now_us),
555        };
556
557        // Recover state from existing WAL
558        wal.recover_state()?;
559
560        Ok(wal)
561    }
562
563    /// Recover state (next txn ID, sequence) from existing WAL
564    ///
565    /// To avoid txn_id collisions when multiple processes open the same
566    /// WAL concurrently, we incorporate the PID into the starting txn_id.
567    /// Format: upper 32 bits = PID, lower 32 bits = counter.
568    /// This guarantees uniqueness across processes without coordination.
569    fn recover_state(&self) -> Result<()> {
570        let file = File::open(&self.path)?;
571        let mut reader = BufReader::new(file);
572        let mut count: u64 = 0;
573
574        // Track the max counter (lower 32 bits) for OUR PID only.
575        // Each process owns its own txn_id space: upper 32 bits = PID.
576        // We must NOT use max_txn_id across ALL PIDs, because that would
577        // place us into another PID's ID space and cause collisions.
578        let our_pid = std::process::id() as u64;
579        let pid_base = our_pid << 32;
580        let mut max_our_counter: u64 = 0;
581
582        loop {
583            match TxnWalEntry::from_reader(&mut reader) {
584                Ok(entry) => {
585                    count += 1;
586                    // Only track counters from entries that belong to our PID
587                    let entry_pid = entry.txn_id >> 32;
588                    if entry_pid == our_pid {
589                        let entry_counter = entry.txn_id & 0xFFFF_FFFF;
590                        if entry_counter > max_our_counter {
591                            max_our_counter = entry_counter;
592                        }
593                    }
594                }
595                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
596                    break;
597                }
598                Err(_) => {
599                    // Ignore corruption at end of WAL (incomplete write)
600                    break;
601                }
602            }
603        }
604
605        // Start from pid_base + (max counter we've seen for our PID + 1).
606        // This ensures:
607        //   1. Unique across processes (different PID → different upper 32 bits)
608        //   2. Unique within this process even if PID is recycled (we skip
609        //      over any counters already used by a previous process with our PID)
610        let next_id = pid_base + max_our_counter + 1;
611
612        self.next_txn_id.store(next_id, Ordering::SeqCst);
613        self.sequence.store(count, Ordering::SeqCst);
614
615        Ok(())
616    }
617
618    /// Get cached timestamp, updating if stale (>1ms old)
619    ///
620    /// This avoids a syscall per write by caching the timestamp.
621    /// For WAL purposes, ~1ms granularity is sufficient.
622    #[inline]
623    fn get_cached_timestamp(&self) -> u64 {
624        // Fast path: use cached value (no syscall)
625        let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
626
627        // Refresh occasionally (every ~1000 writes or when sequence wraps)
628        // This is a very cheap check since sequence is incremented atomically anyway
629        let seq = self.sequence.load(Ordering::Relaxed);
630        if seq & 0x3FF == 0 {
631            // Refresh every 1024 writes
632            let now_us = cached_timestamp_us();
633            self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
634            return now_us;
635        }
636
637        cached
638    }
639
640    /// Append an entry to the WAL
641    ///
642    /// Returns the sequence number of this write.
643    ///
644    /// # Durability Contract
645    ///
646    /// This method writes data to the BufWriter but does **NOT** fsync.
647    /// The data is NOT durable until `flush()` + `sync()` are called.
648    ///
649    /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
650    /// which enforce fsync. This method is intentionally non-durable for
651    /// batching data writes within a transaction (pre-commit records).
652    ///
653    /// The group commit path (`EventDrivenGroupCommit`) is responsible for
654    /// calling `flush()` + `sync()` after batching multiple commit records.
655    pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
656        let bytes = entry.to_bytes();
657        let mut writer = self.writer.lock();
658
659        writer.write_all(&bytes)?;
660        // Don't flush here - BufWriter will batch writes automatically.
661        // Call flush() explicitly before sync() or commit().
662
663        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
664        self.bytes_since_sync
665            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
666
667        Ok(seq)
668    }
669
670    /// Append entry without flushing (for batched writes)
671    ///
672    /// Caller must call flush() or sync() afterward to ensure durability.
673    #[inline]
674    pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
675        let bytes = entry.to_bytes();
676        let mut writer = self.writer.lock();
677
678        writer.write_all(&bytes)?;
679        // No flush - let BufWriter buffer the writes
680
681        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
682        self.bytes_since_sync
683            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
684
685        Ok(seq)
686    }
687
688    /// Write data without flushing (for batched writes within a transaction)
689    #[inline]
690    pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
691        let entry = TxnWalEntry::data(txn_id, key, value);
692        self.append_no_flush(&entry)
693    }
694
695    /// Write data from slices without any allocation
696    ///
697    /// This is the fastest path for writing - no intermediate Vec allocations.
698    /// Calculates CRC32 while serializing directly to BufWriter.
699    #[inline]
700    pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
701        // Use coarse timestamp (cached every ~1ms) instead of syscall per write
702        let timestamp_us = self.get_cached_timestamp();
703
704        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
705        let mut hasher = crc32fast::Hasher::new();
706
707        let mut writer = self.writer.lock();
708
709        // Length (not included in CRC)
710        let content_len = (total_len - 4) as u32;
711        writer.write_all(&content_len.to_le_bytes())?;
712
713        // Record type
714        let record_type_byte = WalRecordType::Data as u8;
715        writer.write_all(&[record_type_byte])?;
716        hasher.update(&[record_type_byte]);
717
718        // Transaction ID
719        let txn_bytes = txn_id.to_le_bytes();
720        writer.write_all(&txn_bytes)?;
721        hasher.update(&txn_bytes);
722
723        // Timestamp
724        let ts_bytes = timestamp_us.to_le_bytes();
725        writer.write_all(&ts_bytes)?;
726        hasher.update(&ts_bytes);
727
728        // Key length
729        let key_len_bytes = (key.len() as u32).to_le_bytes();
730        writer.write_all(&key_len_bytes)?;
731        hasher.update(&key_len_bytes);
732
733        // Value length
734        let val_len_bytes = (value.len() as u32).to_le_bytes();
735        writer.write_all(&val_len_bytes)?;
736        hasher.update(&val_len_bytes);
737
738        // Key data
739        writer.write_all(key)?;
740        hasher.update(key);
741
742        // Value data
743        writer.write_all(value)?;
744        hasher.update(value);
745
746        // CRC32 checksum
747        writer.write_all(&hasher.finalize().to_le_bytes())?;
748
749        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
750        self.bytes_since_sync
751            .fetch_add(total_len as u64, Ordering::Relaxed);
752
753        Ok(seq)
754    }
755
756    /// Flush pending writes to kernel buffer (but not to disk)
757    pub fn flush(&self) -> Result<()> {
758        let mut writer = self.writer.lock();
759        writer.flush()?;
760        Ok(())
761    }
762
763    /// Append and immediately sync for durability
764    pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
765        let seq = self.append(entry)?;
766        self.sync()?;
767        Ok(seq)
768    }
769
770    /// Force sync to disk.
771    ///
772    /// Must flush the BufWriter BEFORE fsync: `get_ref()` reaches the raw File
773    /// and bypasses the 256 KB buffer, so without an explicit `flush()` the
774    /// just-appended commit/checkpoint record may still sit in userspace and
775    /// `sync_all()` would fsync stale bytes — silently breaking the durability
776    /// guarantee `append_sync`/`commit`/`checkpoint` advertise.
777    pub fn sync(&self) -> Result<()> {
778        let mut writer = self.writer.lock();
779        writer.flush()?;
780        writer.get_ref().sync_all()?;
781        self.bytes_since_sync.store(0, Ordering::Relaxed);
782        Ok(())
783    }
784
785    /// Flush a TxnWalBuffer with single lock acquisition
786    ///
787    /// This is the high-performance path for transaction commit:
788    /// - All writes during the transaction are buffered locally
789    /// - At commit, this method flushes everything with ONE lock
790    ///
791    /// ## Performance
792    ///
793    /// ```text
794    /// 1000 writes with individual flush: 1000 × lock overhead
795    /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
796    /// Speedup: ~1000× for lock overhead
797    /// ```
798    #[inline]
799    pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
800        if buffer.is_empty() {
801            return Ok(0);
802        }
803
804        let mut writer = self.writer.lock();
805        writer.write_all(&buffer.buffer)?;
806
807        let seq = self
808            .sequence
809            .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
810        self.bytes_since_sync
811            .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
812
813        Ok(seq)
814    }
815
816    /// Get the current size of the WAL file in bytes
817    pub fn size_bytes(&self) -> u64 {
818        std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
819    }
820
821    /// Allocate a new transaction ID
822    pub fn alloc_txn_id(&self) -> u64 {
823        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
824    }
825
826    /// Begin a new transaction
827    pub fn begin_transaction(&self) -> Result<u64> {
828        let txn_id = self.alloc_txn_id();
829        let entry = TxnWalEntry::txn_begin(txn_id);
830        self.append(&entry)?;
831        Ok(txn_id)
832    }
833
834    /// Commit a transaction (with fsync for durability)
835    ///
836    /// This flushes all pending writes and then fsyncs the commit record.
837    ///
838    /// # Durability Guarantee
839    ///
840    /// After this method returns `Ok(())`, the commit record is durable on disk.
841    /// The transaction is guaranteed to survive process crash, OS crash, and
842    /// power failure (assuming the storage device honors fsync correctly).
843    ///
844    /// # Performance
845    ///
846    /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
847    /// For high-throughput workloads, use `EventDrivenGroupCommit` which
848    /// batches multiple commits into a single fsync via `commit_durable_batch()`.
849    pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
850        // First flush any pending buffered writes
851        self.flush()?;
852
853        // Then write commit record with fsync
854        let entry = TxnWalEntry::txn_commit(txn_id);
855        self.append_sync(&entry)?;
856        Ok(())
857    }
858
859    /// Commit a batch of transactions with a single fsync (group commit).
860    ///
861    /// Writes commit records for all transaction IDs, then performs a single
862    /// flush + fsync. This amortizes the fsync cost across N transactions,
863    /// achieving ~N× throughput improvement over individual commits.
864    ///
865    /// # Durability Guarantee
866    ///
867    /// After this method returns `Ok(())`, ALL transactions in the batch
868    /// are durable on disk. Either all commit records are visible after
869    /// crash recovery, or none are (atomic batch durability).
870    ///
871    /// # Usage
872    ///
873    /// This method is called by `EventDrivenGroupCommit::flush_fn` to
874    /// implement the group commit pattern. Do not call directly unless
875    /// you are implementing your own commit batching.
876    pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
877        // Write all commit records without flushing (batch them in BufWriter)
878        for &txn_id in txn_ids {
879            let entry = TxnWalEntry::txn_commit(txn_id);
880            self.append_no_flush(&entry)?;
881        }
882
883        // Single flush + fsync for the entire batch
884        self.flush()?;
885        self.sync()?;
886        Ok(())
887    }
888
889    /// Abort a transaction
890    pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
891        let entry = TxnWalEntry::txn_abort(txn_id);
892        self.append(&entry)?;
893        Ok(())
894    }
895
896    /// Write data within a transaction
897    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
898        let entry = TxnWalEntry::data(txn_id, key, value);
899        self.append(&entry)
900    }
901
902    /// Replay WAL for crash recovery
903    ///
904    /// Returns (committed_writes, recovered_txn_count)
905    #[allow(clippy::type_complexity)]
906    pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
907        let file = File::open(&self.path)?;
908        let mut reader = BufReader::new(file);
909
910        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
911            std::collections::HashMap::new();
912        let mut result = Vec::new();
913        let mut txn_count = 0;
914
915        // Single pass in WAL order. Transaction ids are process-local and
916        // short-lived CLI processes can reuse the same ids after restart, so
917        // grouping the entire WAL by txn_id would merge unrelated transactions
918        // and replay older writes after newer ones.
919        loop {
920            match TxnWalEntry::from_reader(&mut reader) {
921                Ok(entry) => match entry.record_type {
922                    WalRecordType::TxnBegin => {
923                        pending_writes.insert(entry.txn_id, Vec::new());
924                    }
925                    WalRecordType::Data => {
926                        // Accept data for any txn_id we've seen a Begin for,
927                        // and also for txn_ids without a Begin (they might have
928                        // their Begin later in the WAL due to buffered writes).
929                        pending_writes
930                            .entry(entry.txn_id)
931                            .or_insert_with(Vec::new)
932                            .push((entry.key, entry.value));
933                    }
934                    WalRecordType::TxnCommit => {
935                        if let Some(writes) = pending_writes.remove(&entry.txn_id) {
936                            result.extend(writes);
937                            txn_count += 1;
938                        }
939                    }
940                    WalRecordType::TxnAbort => {
941                        pending_writes.remove(&entry.txn_id);
942                    }
943                    _ => {}
944                },
945                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
946                    break;
947                }
948                Err(_) => {
949                    break;
950                }
951            }
952        }
953
954        Ok((result, txn_count))
955    }
956
957    /// Replay WAL with a callback
958    pub fn replay<F>(&self, mut callback: F) -> Result<u64>
959    where
960        F: FnMut(TxnWalEntry) -> Result<()>,
961    {
962        let file = File::open(&self.path)?;
963        let mut reader = BufReader::new(file);
964        let mut count = 0u64;
965
966        loop {
967            match TxnWalEntry::from_reader(&mut reader) {
968                Ok(entry) => {
969                    callback(entry)?;
970                    count += 1;
971                }
972                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
973                    break;
974                }
975                Err(e) => {
976                    // Log warning but continue (may be incomplete entry at end)
977                    eprintln!("WAL replay warning: {:?}", e);
978                    break;
979                }
980            }
981        }
982
983        Ok(count)
984    }
985
986    /// Truncate WAL (called after successful checkpoint)
987    ///
988    /// Flushes any buffered writes, truncates the file to 0 bytes,
989    /// and resets sequence counters. The file is opened in `O_APPEND`
990    /// mode so subsequent writes will correctly start at offset 0.
991    ///
992    /// **WARNING**: After truncation, all data durability is lost.
993    /// The in-memory memtable still holds data for the current session,
994    /// but a crash after truncation means the data cannot be recovered
995    /// from the WAL.
996    pub fn truncate(&self) -> Result<()> {
997        let mut writer = self.writer.lock();
998        // Flush BufWriter so no stale data is written after truncation
999        writer.flush()?;
1000        let file = writer.get_ref();
1001        file.set_len(0)?;
1002        file.sync_all()?;
1003        self.sequence.store(0, Ordering::SeqCst);
1004        self.bytes_since_sync.store(0, Ordering::Relaxed);
1005        Ok(())
1006    }
1007
1008    /// Write a checkpoint marker
1009    pub fn write_checkpoint(&self) -> Result<u64> {
1010        let entry = TxnWalEntry::checkpoint(0);
1011        self.append_sync(&entry)
1012    }
1013
1014    /// Write a Compensation Log Record (CLR) for undo operations
1015    ///
1016    /// CLRs are redo-only records that skip past already undone operations
1017    /// during recovery. The undo_next_lsn field tells recovery where to
1018    /// continue undoing after this CLR.
1019    pub fn append_clr(
1020        &self,
1021        txn_id: u64,
1022        _original_lsn: u64,
1023        undo_next_lsn: Option<u64>,
1024        undo_data: &[u8],
1025    ) -> Result<u64> {
1026        // Encode undo_next_lsn into the key field
1027        let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1028        let entry = TxnWalEntry {
1029            record_type: WalRecordType::CompensationLogRecord,
1030            txn_id,
1031            timestamp_us: TxnWalEntry::now_us(),
1032            key, // undo_next_lsn encoded in key
1033            value: undo_data.to_vec(),
1034        };
1035        self.append(&entry)
1036    }
1037
1038    /// Write checkpoint with data (for fuzzy checkpoints)
1039    pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1040        let entry = TxnWalEntry {
1041            record_type: WalRecordType::Checkpoint,
1042            txn_id: 0,
1043            timestamp_us: TxnWalEntry::now_us(),
1044            key: Vec::new(),
1045            value: checkpoint_data.to_vec(),
1046        };
1047        self.append_sync(&entry)
1048    }
1049
1050    /// Write checkpoint end with captured state
1051    pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1052        let entry = TxnWalEntry {
1053            record_type: WalRecordType::CheckpointEnd,
1054            txn_id: 0,
1055            timestamp_us: TxnWalEntry::now_us(),
1056            key: Vec::new(),
1057            value: checkpoint_data.to_vec(),
1058        };
1059        self.append_sync(&entry)
1060    }
1061
1062    /// Get current sequence number
1063    pub fn sequence(&self) -> u64 {
1064        self.sequence.load(Ordering::SeqCst)
1065    }
1066
1067    /// Get bytes written since last sync
1068    pub fn bytes_since_sync(&self) -> u64 {
1069        self.bytes_since_sync.load(Ordering::Relaxed)
1070    }
1071
1072    /// Get path to WAL file
1073    pub fn path(&self) -> &Path {
1074        &self.path
1075    }
1076}
1077
1078/// Statistics about WAL state
1079#[derive(Debug, Clone, Default)]
1080pub struct TxnWalStats {
1081    /// Number of entries written
1082    pub entries_written: u64,
1083    /// Bytes written since last sync
1084    pub bytes_since_sync: u64,
1085    /// Current transaction ID counter
1086    pub next_txn_id: u64,
1087}
1088
1089// ============================================================================
1090// Sharded WAL for Reduced Mutex Contention
1091// ============================================================================
1092
1093/// Sharded Write-Ahead Log for high-concurrency workloads
1094///
1095/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1096/// - Writers hash to shard by txn_id
1097/// - Each shard has its own buffer
1098/// - Central coordinator handles fsync ordering
1099///
1100/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1101#[allow(dead_code)]
1102pub struct ShardedWal {
1103    /// Shard writers (txn_id % num_shards selects shard)
1104    shards: Vec<parking_lot::Mutex<WalShard>>,
1105    /// Number of shards (power of 2)
1106    num_shards: usize,
1107    /// Central WAL file for ordered writes
1108    central_writer: parking_lot::Mutex<BufWriter<File>>,
1109    /// Next transaction ID
1110    next_txn_id: AtomicU64,
1111    /// Write sequence (global ordering)
1112    sequence: AtomicU64,
1113    /// Path
1114    path: PathBuf,
1115}
1116
1117/// Individual WAL shard buffer
1118struct WalShard {
1119    /// Buffered entries for this shard
1120    buffer: Vec<u8>,
1121    /// Number of entries buffered
1122    entry_count: usize,
1123}
1124
1125impl WalShard {
1126    fn new() -> Self {
1127        Self {
1128            buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1129            entry_count: 0,
1130        }
1131    }
1132
1133    fn append(&mut self, entry: &TxnWalEntry) {
1134        let bytes = entry.to_bytes();
1135        self.buffer.extend_from_slice(&bytes);
1136        self.entry_count += 1;
1137    }
1138
1139    fn is_empty(&self) -> bool {
1140        self.buffer.is_empty()
1141    }
1142
1143    fn drain(&mut self) -> Vec<u8> {
1144        self.entry_count = 0;
1145        std::mem::take(&mut self.buffer)
1146    }
1147}
1148
1149impl ShardedWal {
1150    /// Create sharded WAL with specified number of shards
1151    ///
1152    /// Recommended: 4-8 shards for typical server workloads
1153    pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1154        let path = path.as_ref().to_path_buf();
1155
1156        if let Some(parent) = path.parent() {
1157            std::fs::create_dir_all(parent)?;
1158        }
1159
1160        let file = std::fs::OpenOptions::new()
1161            .create(true)
1162            .append(true)
1163            .read(true)
1164            .open(&path)?;
1165
1166        // Round up to power of 2 for fast modulo
1167        let num_shards = num_shards.next_power_of_two();
1168        let shards: Vec<_> = (0..num_shards)
1169            .map(|_| parking_lot::Mutex::new(WalShard::new()))
1170            .collect();
1171
1172        Ok(Self {
1173            shards,
1174            num_shards,
1175            central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1176            next_txn_id: AtomicU64::new(1),
1177            sequence: AtomicU64::new(0),
1178            path,
1179        })
1180    }
1181
1182    /// Get shard index for transaction
1183    #[inline]
1184    fn shard_idx(&self, txn_id: u64) -> usize {
1185        (txn_id as usize) & (self.num_shards - 1)
1186    }
1187
1188    /// Append entry to appropriate shard (lock-free for different txns)
1189    pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1190        let shard_idx = self.shard_idx(entry.txn_id);
1191        let mut shard = self.shards[shard_idx].lock();
1192        shard.append(entry);
1193        self.sequence.fetch_add(1, Ordering::SeqCst)
1194    }
1195
1196    /// Allocate transaction ID
1197    pub fn alloc_txn_id(&self) -> u64 {
1198        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1199    }
1200
1201    /// Flush all shard buffers to central file
1202    pub fn flush(&self) -> Result<()> {
1203        let mut central = self.central_writer.lock();
1204
1205        // Collect all shard buffers (brief lock per shard)
1206        for shard in &self.shards {
1207            let mut shard_guard = shard.lock();
1208            if !shard_guard.is_empty() {
1209                let data = shard_guard.drain();
1210                central.write_all(&data)?;
1211            }
1212        }
1213
1214        central.flush()?;
1215        Ok(())
1216    }
1217
1218    /// Sync to disk (fsync)
1219    pub fn sync(&self) -> Result<()> {
1220        self.flush()?;
1221        let central = self.central_writer.lock();
1222        central.get_ref().sync_all()?;
1223        Ok(())
1224    }
1225
1226    /// Begin transaction
1227    pub fn begin_transaction(&self) -> Result<u64> {
1228        let txn_id = self.alloc_txn_id();
1229        let entry = TxnWalEntry::txn_begin(txn_id);
1230        self.append(&entry);
1231        Ok(txn_id)
1232    }
1233
1234    /// Write data
1235    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1236        let entry = TxnWalEntry::data(txn_id, key, value);
1237        Ok(self.append(&entry))
1238    }
1239
1240    /// Commit transaction
1241    pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1242        let entry = TxnWalEntry::txn_commit(txn_id);
1243        let seq = self.append(&entry);
1244        self.sync()?; // Fsync on commit for durability
1245        Ok(seq)
1246    }
1247
1248    /// Get statistics
1249    pub fn stats(&self) -> ShardedWalStats {
1250        let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1251        for shard in &self.shards {
1252            shard_entry_counts.push(shard.lock().entry_count);
1253        }
1254
1255        ShardedWalStats {
1256            num_shards: self.num_shards,
1257            total_entries: self.sequence.load(Ordering::SeqCst),
1258            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1259            shard_entry_counts,
1260        }
1261    }
1262}
1263
1264/// Statistics for sharded WAL
1265#[derive(Debug, Clone)]
1266pub struct ShardedWalStats {
1267    pub num_shards: usize,
1268    pub total_entries: u64,
1269    pub next_txn_id: u64,
1270    pub shard_entry_counts: Vec<usize>,
1271}
1272
1273/// Detailed crash recovery statistics
1274#[derive(Debug, Clone, Default)]
1275pub struct CrashRecoveryStats {
1276    /// Total records read from WAL
1277    pub total_records: u64,
1278    /// Number of committed transactions
1279    pub committed_txns: u64,
1280    /// Number of uncommitted (rolled back) transactions
1281    pub rolled_back_txns: u64,
1282    /// Number of explicitly aborted transactions
1283    pub aborted_txns: u64,
1284    /// Number of data writes recovered
1285    pub recovered_writes: u64,
1286    /// Number of torn/corrupted records at end (expected on crash)
1287    pub torn_records: u64,
1288    /// Bytes read from WAL
1289    pub bytes_read: u64,
1290    /// Recovery duration in microseconds
1291    pub recovery_duration_us: u64,
1292    /// Highest transaction ID seen (for restarting counter)
1293    pub max_txn_id: u64,
1294}
1295
1296impl TxnWal {
1297    /// Get WAL statistics
1298    pub fn stats(&self) -> TxnWalStats {
1299        TxnWalStats {
1300            entries_written: self.sequence.load(Ordering::SeqCst),
1301            bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1302            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1303        }
1304    }
1305
1306    /// Full crash recovery with detailed statistics
1307    ///
1308    /// This method provides ACID recovery guarantees:
1309    /// 1. **Atomicity**: Uncommitted transactions are rolled back
1310    /// 2. **Durability**: All committed transactions are replayed
1311    /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1312    ///
1313    /// Returns (committed_writes, stats) where committed_writes contains
1314    /// all key-value pairs from committed transactions in order.
1315    #[allow(clippy::type_complexity)]
1316    pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1317        let start_time = std::time::Instant::now();
1318        let file = File::open(&self.path)?;
1319        let file_size = file.metadata()?.len();
1320        let mut reader = BufReader::new(file);
1321
1322        let mut stats = CrashRecoveryStats {
1323            bytes_read: file_size,
1324            ..Default::default()
1325        };
1326
1327        let mut committed_txns: HashSet<u64> = HashSet::new();
1328        let mut aborted_txns: HashSet<u64> = HashSet::new();
1329        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1330            std::collections::HashMap::new();
1331        let mut all_txns: HashSet<u64> = HashSet::new();
1332
1333        // Read all records, stopping at first corruption (torn write)
1334        loop {
1335            match TxnWalEntry::from_reader(&mut reader) {
1336                Ok(entry) => {
1337                    stats.total_records += 1;
1338                    if entry.txn_id > stats.max_txn_id {
1339                        stats.max_txn_id = entry.txn_id;
1340                    }
1341
1342                    match entry.record_type {
1343                        WalRecordType::TxnBegin => {
1344                            pending_writes.insert(entry.txn_id, Vec::new());
1345                            all_txns.insert(entry.txn_id);
1346                        }
1347                        WalRecordType::Data => {
1348                            if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1349                                writes.push((entry.key, entry.value));
1350                            }
1351                        }
1352                        WalRecordType::TxnCommit => {
1353                            committed_txns.insert(entry.txn_id);
1354                        }
1355                        WalRecordType::TxnAbort => {
1356                            pending_writes.remove(&entry.txn_id);
1357                            aborted_txns.insert(entry.txn_id);
1358                        }
1359                        _ => {}
1360                    }
1361                }
1362                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1363                    // Clean EOF
1364                    break;
1365                }
1366                Err(_) => {
1367                    // Corruption or torn write - stop here
1368                    stats.torn_records += 1;
1369                    break;
1370                }
1371            }
1372        }
1373
1374        // Collect committed writes
1375        let mut result = Vec::new();
1376        for (txn_id, writes) in &pending_writes {
1377            if committed_txns.contains(txn_id) {
1378                stats.committed_txns += 1;
1379                stats.recovered_writes += writes.len() as u64;
1380                result.extend(writes.clone());
1381            }
1382        }
1383
1384        // Count aborted and rolled-back transactions
1385        stats.aborted_txns = aborted_txns.len() as u64;
1386        stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1387
1388        stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1389
1390        Ok((result, stats))
1391    }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396    use super::*;
1397    use tempfile::tempdir;
1398
1399    #[test]
1400    fn test_wal_entry_roundtrip() {
1401        let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1402        let bytes = entry.to_bytes();
1403
1404        let mut cursor = std::io::Cursor::new(bytes);
1405        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1406
1407        assert_eq!(recovered.record_type, WalRecordType::Data);
1408        assert_eq!(recovered.txn_id, 42);
1409        assert_eq!(recovered.key, b"key");
1410        assert_eq!(recovered.value, b"value");
1411    }
1412
1413    #[test]
1414    fn test_wal_append_and_replay() {
1415        let dir = tempdir().unwrap();
1416        let wal_path = dir.path().join("test.wal");
1417
1418        // Write some entries
1419        {
1420            let wal = TxnWal::new(&wal_path).unwrap();
1421            let txn_id = wal.begin_transaction().unwrap();
1422            wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1423            wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1424            wal.commit_transaction(txn_id).unwrap();
1425        }
1426
1427        // Replay and verify
1428        {
1429            let wal = TxnWal::new(&wal_path).unwrap();
1430            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1431
1432            assert_eq!(txn_count, 1);
1433            assert_eq!(writes.len(), 2);
1434            assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1435            assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1436        }
1437    }
1438
1439    #[test]
1440    fn test_uncommitted_transaction_rollback() {
1441        let dir = tempdir().unwrap();
1442        let wal_path = dir.path().join("test.wal");
1443
1444        // Write committed and uncommitted transactions
1445        {
1446            let wal = TxnWal::new(&wal_path).unwrap();
1447
1448            // Committed transaction
1449            let txn1 = wal.begin_transaction().unwrap();
1450            wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1451                .unwrap();
1452            wal.commit_transaction(txn1).unwrap();
1453
1454            // Uncommitted transaction (simulates crash)
1455            let txn2 = wal.begin_transaction().unwrap();
1456            wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1457                .unwrap();
1458            // No commit!
1459        }
1460
1461        // Replay - uncommitted should be rolled back
1462        {
1463            let wal = TxnWal::new(&wal_path).unwrap();
1464            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1465
1466            assert_eq!(txn_count, 1); // Only committed transaction
1467            assert_eq!(writes.len(), 1);
1468            assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1469        }
1470    }
1471
1472    #[test]
1473    fn test_aborted_transaction() {
1474        let dir = tempdir().unwrap();
1475        let wal_path = dir.path().join("test.wal");
1476
1477        {
1478            let wal = TxnWal::new(&wal_path).unwrap();
1479
1480            let txn = wal.begin_transaction().unwrap();
1481            wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1482                .unwrap();
1483            wal.abort_transaction(txn).unwrap();
1484        }
1485
1486        {
1487            let wal = TxnWal::new(&wal_path).unwrap();
1488            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1489
1490            assert_eq!(txn_count, 0);
1491            assert!(writes.is_empty());
1492        }
1493    }
1494
1495    #[test]
1496    fn test_checksum_validation() {
1497        let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1498        let mut bytes = entry.to_bytes();
1499
1500        // Corrupt the checksum
1501        let len = bytes.len();
1502        bytes[len - 1] ^= 0xFF;
1503
1504        let mut cursor = std::io::Cursor::new(bytes);
1505        let result = TxnWalEntry::from_reader(&mut cursor);
1506
1507        assert!(result.is_err());
1508    }
1509
1510    #[test]
1511    fn test_crash_recovery_with_stats() {
1512        let dir = tempdir().unwrap();
1513        let wal_path = dir.path().join("test.wal");
1514
1515        // Simulate complex workload
1516        {
1517            let wal = TxnWal::new(&wal_path).unwrap();
1518
1519            // Committed transaction 1
1520            let txn1 = wal.begin_transaction().unwrap();
1521            wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1522            wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1523            wal.commit_transaction(txn1).unwrap();
1524
1525            // Aborted transaction
1526            let txn2 = wal.begin_transaction().unwrap();
1527            wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1528                .unwrap();
1529            wal.abort_transaction(txn2).unwrap();
1530
1531            // Committed transaction 2
1532            let txn3 = wal.begin_transaction().unwrap();
1533            wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1534            wal.commit_transaction(txn3).unwrap();
1535
1536            // Uncommitted transaction (simulates crash)
1537            let txn4 = wal.begin_transaction().unwrap();
1538            wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1539                .unwrap();
1540            // No commit - simulates crash
1541        }
1542
1543        // Recover and verify
1544        {
1545            let wal = TxnWal::new(&wal_path).unwrap();
1546            let (writes, stats) = wal.crash_recovery().unwrap();
1547
1548            // Should have 3 writes from 2 committed transactions
1549            assert_eq!(writes.len(), 3);
1550            assert_eq!(stats.committed_txns, 2);
1551            assert_eq!(stats.aborted_txns, 1);
1552            assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1553            assert_eq!(stats.recovered_writes, 3);
1554            assert!(stats.recovery_duration_us > 0);
1555        }
1556    }
1557
1558    #[test]
1559    fn test_torn_write_detection() {
1560        let dir = tempdir().unwrap();
1561        let wal_path = dir.path().join("test.wal");
1562
1563        // Write a valid transaction
1564        {
1565            let wal = TxnWal::new(&wal_path).unwrap();
1566            let txn = wal.begin_transaction().unwrap();
1567            wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1568            wal.commit_transaction(txn).unwrap();
1569        }
1570
1571        // Append corrupted bytes to simulate torn write
1572        {
1573            use std::io::Write;
1574            let mut file = std::fs::OpenOptions::new()
1575                .append(true)
1576                .open(&wal_path)
1577                .unwrap();
1578            // Write partial record (torn write)
1579            file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1580                .unwrap();
1581        }
1582
1583        // Recovery should still work, detecting torn write
1584        {
1585            let wal = TxnWal::new(&wal_path).unwrap();
1586            let (writes, stats) = wal.crash_recovery().unwrap();
1587
1588            // Should recover the valid transaction
1589            assert_eq!(writes.len(), 1);
1590            assert_eq!(stats.committed_txns, 1);
1591            assert_eq!(stats.torn_records, 1);
1592        }
1593    }
1594
1595    #[test]
1596    fn test_crc32_determinism() {
1597        // Verify CRC32 produces consistent checksums for same content
1598        let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1599        entry1.timestamp_us = 12345; // Fixed timestamp for determinism
1600
1601        let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1602        entry2.timestamp_us = 12345; // Same timestamp
1603
1604        assert_eq!(entry1.checksum(), entry2.checksum());
1605
1606        // Different content should produce different checksum
1607        let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1608        entry3.timestamp_us = 12345;
1609        assert_ne!(entry1.checksum(), entry3.checksum());
1610
1611        // Verify roundtrip preserves checksum
1612        let bytes = entry1.to_bytes();
1613        let mut cursor = std::io::Cursor::new(bytes);
1614        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1615        assert_eq!(recovered.checksum(), entry1.checksum());
1616    }
1617}