Skip to main content

sochdb_storage/
txn_wal.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1)  │ TxnId(8)│ Timestamp │ KeyLen  │ ValueLen  │ Key(*)  │ Value(*)│ CRC32(4) │
40//! │           │          │         │   (8)     │  (4)    │   (4)     │         │         │          │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use byteorder::{LittleEndian, ReadBytesExt};
52use parking_lot::Mutex;
53use std::cell::Cell;
54use std::collections::HashSet;
55use std::fs::{File, OpenOptions};
56use std::io::{BufReader, BufWriter, Read, Write};
57use std::path::{Path, PathBuf};
58use std::sync::atomic::{AtomicU64, Ordering};
59use std::time::{Instant, SystemTime, UNIX_EPOCH};
60use sochdb_core::{Result, SochDBError, WalRecordType};
61
62// =============================================================================
63// Coarse-Grained Timestamp Caching (Recommendation 5)
64// =============================================================================
65
66/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
67const CACHE_VALIDITY_NS: u64 = 1_000_000;
68
69thread_local! {
70    /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
71    /// Avoids syscall overhead by caching timestamp for ~1ms windows
72    static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
73}
74
75/// Get cached timestamp in microseconds
76///
77/// This function eliminates per-write `SystemTime::now()` syscalls by:
78/// 1. Caching the wall-clock timestamp in thread-local storage
79/// 2. Using monotonic `Instant` for sub-millisecond offsets
80/// 3. Only refreshing from syscall when cache expires (every ~1ms)
81///
82/// ## Performance Impact
83///
84/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
85/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
86///
87/// ## Monotonicity Guarantee
88///
89/// Timestamps are guaranteed to be monotonically increasing within a thread
90/// due to the `elapsed` offset added to the cached base timestamp.
91#[inline(always)]
92pub fn cached_timestamp_us() -> u64 {
93    TS_CACHE.with(|cache| {
94        let (instant, ts) = cache.get();
95        let elapsed_ns = instant.elapsed().as_nanos() as u64;
96        
97        if elapsed_ns < CACHE_VALIDITY_NS {
98            // Fast path: return cached + monotonic offset
99            // This is pure arithmetic, no syscall
100            ts + elapsed_ns / 1000
101        } else {
102            // Slow path: refresh cache from syscall
103            let new_ts = SystemTime::now()
104                .duration_since(UNIX_EPOCH)
105                .expect("system clock set before UNIX epoch (1970-01-01)")
106                .as_micros() as u64;
107            cache.set((Instant::now(), new_ts));
108            new_ts
109        }
110    })
111}
112
113/// Header size in bytes (without key/value data)
114const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
115
116/// Checksum size (CRC32)
117const CHECKSUM_SIZE: usize = 4;
118
119/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
120const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
121
122// =============================================================================
123// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
124// =============================================================================
125
126/// Transaction-local WAL write buffer
127///
128/// Collects all writes during a transaction in memory, then flushes
129/// everything with a SINGLE lock acquisition at commit time.
130///
131/// ## Performance Impact
132///
133/// ```text
134/// Without TxnWalBuffer (current):
135///   1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
136///   Lock overhead: 1000 × 20ns = 20µs per transaction
137///
138/// With TxnWalBuffer:
139///   1000 writes buffered locally (NO LOCK)
140///   1 flush at commit (SINGLE LOCK)
141///   Lock overhead: 1 × 20ns = 20ns per transaction
142///   Speedup: 1000× for lock overhead
143/// ```
144///
145/// ## Usage
146///
147/// ```ignore
148/// let mut buffer = TxnWalBuffer::new(txn_id);
149///
150/// // These do NOT acquire any locks
151/// buffer.append(b"key1", b"value1");
152/// buffer.append(b"key2", b"value2");
153/// // ... hundreds of writes ...
154///
155/// // Single lock acquisition, single write syscall
156/// buffer.flush(&wal)?;
157/// ```
158#[derive(Debug)]
159pub struct TxnWalBuffer {
160    /// Transaction ID for all entries
161    txn_id: u64,
162    /// Accumulated serialized entries
163    buffer: Vec<u8>,
164    /// Number of entries buffered
165    entry_count: usize,
166}
167
168impl TxnWalBuffer {
169    /// Create a new buffer for a transaction
170    #[inline]
171    pub fn new(txn_id: u64) -> Self {
172        Self {
173            txn_id,
174            buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
175            entry_count: 0,
176        }
177    }
178
179    /// Create with specific capacity
180    #[inline]
181    pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
182        Self {
183            txn_id,
184            buffer: Vec::with_capacity(capacity),
185            entry_count: 0,
186        }
187    }
188
189    /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
190    ///
191    /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
192    /// This is completely lock-free and does not touch the file system.
193    ///
194    /// Uses cached timestamps to eliminate per-write syscall overhead.
195    #[inline]
196    pub fn append(&mut self, key: &[u8], value: &[u8]) {
197        // Use cached timestamp instead of syscall (Recommendation 5)
198        let timestamp_us = cached_timestamp_us();
199
200        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
201        let entry_start = self.buffer.len();
202
203        // Reserve space for length prefix (will fill at end)
204        self.buffer.extend_from_slice(&[0u8; 4]);
205
206        let mut hasher = crc32fast::Hasher::new();
207
208        // Record type (Data = 0)
209        let record_type_byte = WalRecordType::Data as u8;
210        self.buffer.push(record_type_byte);
211        hasher.update(&[record_type_byte]);
212
213        // Transaction ID
214        let txn_bytes = self.txn_id.to_le_bytes();
215        self.buffer.extend_from_slice(&txn_bytes);
216        hasher.update(&txn_bytes);
217
218        // Timestamp
219        let ts_bytes = timestamp_us.to_le_bytes();
220        self.buffer.extend_from_slice(&ts_bytes);
221        hasher.update(&ts_bytes);
222
223        // Key length
224        let key_len_bytes = (key.len() as u32).to_le_bytes();
225        self.buffer.extend_from_slice(&key_len_bytes);
226        hasher.update(&key_len_bytes);
227
228        // Value length
229        let val_len_bytes = (value.len() as u32).to_le_bytes();
230        self.buffer.extend_from_slice(&val_len_bytes);
231        hasher.update(&val_len_bytes);
232
233        // Key data
234        self.buffer.extend_from_slice(key);
235        hasher.update(key);
236
237        // Value data
238        self.buffer.extend_from_slice(value);
239        hasher.update(value);
240
241        // CRC32 checksum
242        self.buffer
243            .extend_from_slice(&hasher.finalize().to_le_bytes());
244
245        // Fill in length prefix (content length, not including the 4-byte length field)
246        let content_len = (total_len - 4) as u32;
247        self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
248
249        self.entry_count += 1;
250    }
251
252    /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
253    ///
254    /// Acquires the WAL lock once and writes all accumulated entries.
255    /// Returns the first sequence number assigned to buffered entries.
256    ///
257    /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
258    /// as it properly handles the private writer field.
259    #[inline]
260    pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
261        wal.flush_buffer(self)
262    }
263
264    /// Clear the buffer (for reuse)
265    #[inline]
266    pub fn clear(&mut self) {
267        self.buffer.clear();
268        self.entry_count = 0;
269    }
270
271    /// Get number of buffered entries
272    #[inline]
273    pub fn entry_count(&self) -> usize {
274        self.entry_count
275    }
276
277    /// Get total bytes buffered
278    #[inline]
279    pub fn bytes_buffered(&self) -> usize {
280        self.buffer.len()
281    }
282
283    /// Check if buffer is empty
284    #[inline]
285    pub fn is_empty(&self) -> bool {
286        self.buffer.is_empty()
287    }
288}
289
290/// WAL entry for transaction-aware operations
291#[derive(Debug, Clone)]
292pub struct TxnWalEntry {
293    /// Record type
294    pub record_type: WalRecordType,
295    /// Transaction ID
296    pub txn_id: u64,
297    /// Timestamp in microseconds
298    pub timestamp_us: u64,
299    /// Key data
300    pub key: Vec<u8>,
301    /// Value data
302    pub value: Vec<u8>,
303}
304
305impl TxnWalEntry {
306    /// Create a new data entry
307    pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
308        Self {
309            record_type: WalRecordType::Data,
310            txn_id,
311            timestamp_us: Self::now_us(),
312            key,
313            value,
314        }
315    }
316
317    /// Create a transaction begin entry
318    pub fn txn_begin(txn_id: u64) -> Self {
319        Self {
320            record_type: WalRecordType::TxnBegin,
321            txn_id,
322            timestamp_us: Self::now_us(),
323            key: Vec::new(),
324            value: Vec::new(),
325        }
326    }
327
328    /// Create a transaction commit entry
329    pub fn txn_commit(txn_id: u64) -> Self {
330        Self {
331            record_type: WalRecordType::TxnCommit,
332            txn_id,
333            timestamp_us: Self::now_us(),
334            key: Vec::new(),
335            value: Vec::new(),
336        }
337    }
338
339    /// Create a transaction abort entry
340    pub fn txn_abort(txn_id: u64) -> Self {
341        Self {
342            record_type: WalRecordType::TxnAbort,
343            txn_id,
344            timestamp_us: Self::now_us(),
345            key: Vec::new(),
346            value: Vec::new(),
347        }
348    }
349
350    /// Create a checkpoint entry
351    pub fn checkpoint(txn_id: u64) -> Self {
352        Self {
353            record_type: WalRecordType::Checkpoint,
354            txn_id,
355            timestamp_us: Self::now_us(),
356            key: Vec::new(),
357            value: Vec::new(),
358        }
359    }
360
361    /// Create a schema change entry
362    pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
363        Self {
364            record_type: WalRecordType::SchemaChange,
365            txn_id,
366            timestamp_us: Self::now_us(),
367            key: Vec::new(),
368            value: schema_data,
369        }
370    }
371
372    /// Get current time in microseconds (uses cached timestamp)
373    #[inline]
374    fn now_us() -> u64 {
375        cached_timestamp_us()
376    }
377
378    /// Calculate CRC32 checksum for this entry
379    ///
380    /// Uses crc32fast for portable, deterministic checksums.
381    /// The checksum covers all fields except the checksum itself.
382    /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
383    pub fn checksum(&self) -> u32 {
384        let mut hasher = crc32fast::Hasher::new();
385        hasher.update(&[self.record_type as u8]);
386        hasher.update(&self.txn_id.to_le_bytes());
387        hasher.update(&self.timestamp_us.to_le_bytes());
388        hasher.update(&(self.key.len() as u32).to_le_bytes());
389        hasher.update(&(self.value.len() as u32).to_le_bytes());
390        hasher.update(&self.key);
391        hasher.update(&self.value);
392        hasher.finalize()
393    }
394
395    /// Serialize to bytes with single-pass CRC calculation
396    ///
397    /// Optimized to calculate CRC while building the buffer,
398    /// avoiding a second pass over all data.
399    pub fn to_bytes(&self) -> Vec<u8> {
400        let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
401        let mut buf = Vec::with_capacity(total_len);
402        let mut hasher = crc32fast::Hasher::new();
403
404        // Length (not including the length field itself) - not included in CRC
405        let content_len = (total_len - 4) as u32;
406        buf.extend_from_slice(&content_len.to_le_bytes());
407
408        // Record type
409        let record_type_byte = self.record_type as u8;
410        buf.push(record_type_byte);
411        hasher.update(&[record_type_byte]);
412
413        // Transaction ID
414        let txn_bytes = self.txn_id.to_le_bytes();
415        buf.extend_from_slice(&txn_bytes);
416        hasher.update(&txn_bytes);
417
418        // Timestamp
419        let ts_bytes = self.timestamp_us.to_le_bytes();
420        buf.extend_from_slice(&ts_bytes);
421        hasher.update(&ts_bytes);
422
423        // Key length
424        let key_len_bytes = (self.key.len() as u32).to_le_bytes();
425        buf.extend_from_slice(&key_len_bytes);
426        hasher.update(&key_len_bytes);
427
428        // Value length
429        let val_len_bytes = (self.value.len() as u32).to_le_bytes();
430        buf.extend_from_slice(&val_len_bytes);
431        hasher.update(&val_len_bytes);
432
433        // Key data
434        buf.extend_from_slice(&self.key);
435        hasher.update(&self.key);
436
437        // Value data
438        buf.extend_from_slice(&self.value);
439        hasher.update(&self.value);
440
441        // CRC32 Checksum (computed in single pass above)
442        buf.extend_from_slice(&hasher.finalize().to_le_bytes());
443
444        buf
445    }
446
447    /// Deserialize from bytes with torn write detection
448    ///
449    /// Returns error if:
450    /// - Length field indicates data is too short
451    /// - CRC32 checksum mismatch (corruption or torn write)
452    /// - Invalid record type
453    pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
454        // Length (allows torn write detection - if length claims more data than exists)
455        let content_len = reader.read_u32::<LittleEndian>()?;
456        if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
457            return Err(SochDBError::Corruption("WAL entry too short".into()));
458        }
459
460        // Record type
461        let record_type_byte = reader.read_u8()?;
462        let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
463            SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
464        })?;
465
466        // Transaction ID
467        let txn_id = reader.read_u64::<LittleEndian>()?;
468
469        // Timestamp
470        let timestamp_us = reader.read_u64::<LittleEndian>()?;
471
472        // Key length
473        let key_len = reader.read_u32::<LittleEndian>()? as usize;
474
475        // Value length
476        let value_len = reader.read_u32::<LittleEndian>()? as usize;
477
478        // Key data
479        let mut key = vec![0u8; key_len];
480        reader.read_exact(&mut key)?;
481
482        // Value data
483        let mut value = vec![0u8; value_len];
484        reader.read_exact(&mut value)?;
485
486        // CRC32 Checksum
487        let stored_checksum = reader.read_u32::<LittleEndian>()?;
488
489        let entry = Self {
490            record_type,
491            txn_id,
492            timestamp_us,
493            key,
494            value,
495        };
496
497        // Verify checksum - detects both corruption and torn writes
498        if entry.checksum() != stored_checksum {
499            return Err(SochDBError::Corruption(format!(
500                "WAL checksum mismatch for txn_id {}: expected {}, got {}",
501                txn_id,
502                entry.checksum(),
503                stored_checksum
504            )));
505        }
506
507        Ok(entry)
508    }
509}
510
511/// Transaction-aware Write-Ahead Log
512pub struct TxnWal {
513    /// Path to WAL file
514    path: PathBuf,
515    /// Buffered writer
516    writer: Mutex<BufWriter<File>>,
517    /// Next transaction ID
518    next_txn_id: AtomicU64,
519    /// Write sequence number
520    sequence: AtomicU64,
521    /// Bytes written since last sync
522    bytes_since_sync: AtomicU64,
523    /// Cached timestamp (microseconds since epoch)
524    /// Updated periodically to avoid syscall per write
525    cached_timestamp_us: AtomicU64,
526}
527
528impl TxnWal {
529    /// Create a new WAL or open existing one
530    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
531        let path = path.as_ref().to_path_buf();
532
533        // Ensure parent directory exists
534        if let Some(parent) = path.parent() {
535            std::fs::create_dir_all(parent)?;
536        }
537
538        let file = OpenOptions::new()
539            .create(true)
540            .append(true)
541            .read(true)
542            .open(&path)?;
543
544        // Use 256KB buffer for better batch performance (default is 8KB)
545        // This reduces system calls when buffering many small writes
546        let now_us = cached_timestamp_us();
547
548        let wal = Self {
549            path,
550            writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
551            next_txn_id: AtomicU64::new(1),
552            sequence: AtomicU64::new(0),
553            bytes_since_sync: AtomicU64::new(0),
554            cached_timestamp_us: AtomicU64::new(now_us),
555        };
556
557        // Recover state from existing WAL
558        wal.recover_state()?;
559
560        Ok(wal)
561    }
562
563    /// Recover state (next txn ID, sequence) from existing WAL
564    ///
565    /// To avoid txn_id collisions when multiple processes open the same
566    /// WAL concurrently, we incorporate the PID into the starting txn_id.
567    /// Format: upper 32 bits = PID, lower 32 bits = counter.
568    /// This guarantees uniqueness across processes without coordination.
569    fn recover_state(&self) -> Result<()> {
570        let file = File::open(&self.path)?;
571        let mut reader = BufReader::new(file);
572        let mut count: u64 = 0;
573
574        // Track the max counter (lower 32 bits) for OUR PID only.
575        // Each process owns its own txn_id space: upper 32 bits = PID.
576        // We must NOT use max_txn_id across ALL PIDs, because that would
577        // place us into another PID's ID space and cause collisions.
578        let our_pid = std::process::id() as u64;
579        let pid_base = our_pid << 32;
580        let mut max_our_counter: u64 = 0;
581
582        loop {
583            match TxnWalEntry::from_reader(&mut reader) {
584                Ok(entry) => {
585                    count += 1;
586                    // Only track counters from entries that belong to our PID
587                    let entry_pid = entry.txn_id >> 32;
588                    if entry_pid == our_pid {
589                        let entry_counter = entry.txn_id & 0xFFFF_FFFF;
590                        if entry_counter > max_our_counter {
591                            max_our_counter = entry_counter;
592                        }
593                    }
594                }
595                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
596                    break;
597                }
598                Err(_) => {
599                    // Ignore corruption at end of WAL (incomplete write)
600                    break;
601                }
602            }
603        }
604
605        // Start from pid_base + (max counter we've seen for our PID + 1).
606        // This ensures:
607        //   1. Unique across processes (different PID → different upper 32 bits)
608        //   2. Unique within this process even if PID is recycled (we skip
609        //      over any counters already used by a previous process with our PID)
610        let next_id = pid_base + max_our_counter + 1;
611
612        self.next_txn_id.store(next_id, Ordering::SeqCst);
613        self.sequence.store(count, Ordering::SeqCst);
614
615        Ok(())
616    }
617
618    /// Get cached timestamp, updating if stale (>1ms old)
619    ///
620    /// This avoids a syscall per write by caching the timestamp.
621    /// For WAL purposes, ~1ms granularity is sufficient.
622    #[inline]
623    fn get_cached_timestamp(&self) -> u64 {
624        // Fast path: use cached value (no syscall)
625        let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
626
627        // Refresh occasionally (every ~1000 writes or when sequence wraps)
628        // This is a very cheap check since sequence is incremented atomically anyway
629        let seq = self.sequence.load(Ordering::Relaxed);
630        if seq & 0x3FF == 0 {
631            // Refresh every 1024 writes
632            let now_us = cached_timestamp_us();
633            self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
634            return now_us;
635        }
636
637        cached
638    }
639
640    /// Append an entry to the WAL
641    ///
642    /// Returns the sequence number of this write.
643    ///
644    /// # Durability Contract
645    ///
646    /// This method writes data to the BufWriter but does **NOT** fsync.
647    /// The data is NOT durable until `flush()` + `sync()` are called.
648    ///
649    /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
650    /// which enforce fsync. This method is intentionally non-durable for
651    /// batching data writes within a transaction (pre-commit records).
652    ///
653    /// The group commit path (`EventDrivenGroupCommit`) is responsible for
654    /// calling `flush()` + `sync()` after batching multiple commit records.
655    pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
656        let bytes = entry.to_bytes();
657        let mut writer = self.writer.lock();
658
659        writer.write_all(&bytes)?;
660        // Don't flush here - BufWriter will batch writes automatically.
661        // Call flush() explicitly before sync() or commit().
662
663        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
664        self.bytes_since_sync
665            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
666
667        Ok(seq)
668    }
669
670    /// Append entry without flushing (for batched writes)
671    ///
672    /// Caller must call flush() or sync() afterward to ensure durability.
673    #[inline]
674    pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
675        let bytes = entry.to_bytes();
676        let mut writer = self.writer.lock();
677
678        writer.write_all(&bytes)?;
679        // No flush - let BufWriter buffer the writes
680
681        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
682        self.bytes_since_sync
683            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
684
685        Ok(seq)
686    }
687
688    /// Write data without flushing (for batched writes within a transaction)
689    #[inline]
690    pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
691        let entry = TxnWalEntry::data(txn_id, key, value);
692        self.append_no_flush(&entry)
693    }
694
695    /// Write data from slices without any allocation
696    ///
697    /// This is the fastest path for writing - no intermediate Vec allocations.
698    /// Calculates CRC32 while serializing directly to BufWriter.
699    #[inline]
700    pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
701        // Use coarse timestamp (cached every ~1ms) instead of syscall per write
702        let timestamp_us = self.get_cached_timestamp();
703
704        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
705        let mut hasher = crc32fast::Hasher::new();
706
707        let mut writer = self.writer.lock();
708
709        // Length (not included in CRC)
710        let content_len = (total_len - 4) as u32;
711        writer.write_all(&content_len.to_le_bytes())?;
712
713        // Record type
714        let record_type_byte = WalRecordType::Data as u8;
715        writer.write_all(&[record_type_byte])?;
716        hasher.update(&[record_type_byte]);
717
718        // Transaction ID
719        let txn_bytes = txn_id.to_le_bytes();
720        writer.write_all(&txn_bytes)?;
721        hasher.update(&txn_bytes);
722
723        // Timestamp
724        let ts_bytes = timestamp_us.to_le_bytes();
725        writer.write_all(&ts_bytes)?;
726        hasher.update(&ts_bytes);
727
728        // Key length
729        let key_len_bytes = (key.len() as u32).to_le_bytes();
730        writer.write_all(&key_len_bytes)?;
731        hasher.update(&key_len_bytes);
732
733        // Value length
734        let val_len_bytes = (value.len() as u32).to_le_bytes();
735        writer.write_all(&val_len_bytes)?;
736        hasher.update(&val_len_bytes);
737
738        // Key data
739        writer.write_all(key)?;
740        hasher.update(key);
741
742        // Value data
743        writer.write_all(value)?;
744        hasher.update(value);
745
746        // CRC32 checksum
747        writer.write_all(&hasher.finalize().to_le_bytes())?;
748
749        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
750        self.bytes_since_sync
751            .fetch_add(total_len as u64, Ordering::Relaxed);
752
753        Ok(seq)
754    }
755
756    /// Flush pending writes to kernel buffer (but not to disk)
757    pub fn flush(&self) -> Result<()> {
758        let mut writer = self.writer.lock();
759        writer.flush()?;
760        Ok(())
761    }
762
763    /// Append and immediately sync for durability
764    pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
765        let seq = self.append(entry)?;
766        self.sync()?;
767        Ok(seq)
768    }
769
770    /// Force sync to disk
771    pub fn sync(&self) -> Result<()> {
772        let writer = self.writer.lock();
773        writer.get_ref().sync_all()?;
774        self.bytes_since_sync.store(0, Ordering::Relaxed);
775        Ok(())
776    }
777
778    /// Flush a TxnWalBuffer with single lock acquisition
779    ///
780    /// This is the high-performance path for transaction commit:
781    /// - All writes during the transaction are buffered locally
782    /// - At commit, this method flushes everything with ONE lock
783    ///
784    /// ## Performance
785    ///
786    /// ```text
787    /// 1000 writes with individual flush: 1000 × lock overhead
788    /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
789    /// Speedup: ~1000× for lock overhead
790    /// ```
791    #[inline]
792    pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
793        if buffer.is_empty() {
794            return Ok(0);
795        }
796
797        let mut writer = self.writer.lock();
798        writer.write_all(&buffer.buffer)?;
799
800        let seq = self
801            .sequence
802            .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
803        self.bytes_since_sync
804            .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
805
806        Ok(seq)
807    }
808
809    /// Get the current size of the WAL file in bytes
810    pub fn size_bytes(&self) -> u64 {
811        std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
812    }
813
814    /// Allocate a new transaction ID
815    pub fn alloc_txn_id(&self) -> u64 {
816        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
817    }
818
819    /// Begin a new transaction
820    pub fn begin_transaction(&self) -> Result<u64> {
821        let txn_id = self.alloc_txn_id();
822        let entry = TxnWalEntry::txn_begin(txn_id);
823        self.append(&entry)?;
824        Ok(txn_id)
825    }
826
827    /// Commit a transaction (with fsync for durability)
828    ///
829    /// This flushes all pending writes and then fsyncs the commit record.
830    ///
831    /// # Durability Guarantee
832    ///
833    /// After this method returns `Ok(())`, the commit record is durable on disk.
834    /// The transaction is guaranteed to survive process crash, OS crash, and
835    /// power failure (assuming the storage device honors fsync correctly).
836    ///
837    /// # Performance
838    ///
839    /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
840    /// For high-throughput workloads, use `EventDrivenGroupCommit` which
841    /// batches multiple commits into a single fsync via `commit_durable_batch()`.
842    pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
843        // First flush any pending buffered writes
844        self.flush()?;
845
846        // Then write commit record with fsync
847        let entry = TxnWalEntry::txn_commit(txn_id);
848        self.append_sync(&entry)?;
849        Ok(())
850    }
851
852    /// Commit a batch of transactions with a single fsync (group commit).
853    ///
854    /// Writes commit records for all transaction IDs, then performs a single
855    /// flush + fsync. This amortizes the fsync cost across N transactions,
856    /// achieving ~N× throughput improvement over individual commits.
857    ///
858    /// # Durability Guarantee
859    ///
860    /// After this method returns `Ok(())`, ALL transactions in the batch
861    /// are durable on disk. Either all commit records are visible after
862    /// crash recovery, or none are (atomic batch durability).
863    ///
864    /// # Usage
865    ///
866    /// This method is called by `EventDrivenGroupCommit::flush_fn` to
867    /// implement the group commit pattern. Do not call directly unless
868    /// you are implementing your own commit batching.
869    pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
870        // Write all commit records without flushing (batch them in BufWriter)
871        for &txn_id in txn_ids {
872            let entry = TxnWalEntry::txn_commit(txn_id);
873            self.append_no_flush(&entry)?;
874        }
875
876        // Single flush + fsync for the entire batch
877        self.flush()?;
878        self.sync()?;
879        Ok(())
880    }
881
882    /// Abort a transaction
883    pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
884        let entry = TxnWalEntry::txn_abort(txn_id);
885        self.append(&entry)?;
886        Ok(())
887    }
888
889    /// Write data within a transaction
890    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
891        let entry = TxnWalEntry::data(txn_id, key, value);
892        self.append(&entry)
893    }
894
895    /// Replay WAL for crash recovery
896    ///
897    /// Returns (committed_writes, recovered_txn_count)
898    #[allow(clippy::type_complexity)]
899    pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
900        let file = File::open(&self.path)?;
901        let mut reader = BufReader::new(file);
902
903        let mut committed_txns: HashSet<u64> = HashSet::new();
904        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
905            std::collections::HashMap::new();
906
907        // Single pass: collect all entries
908        loop {
909            match TxnWalEntry::from_reader(&mut reader) {
910                Ok(entry) => match entry.record_type {
911                    WalRecordType::TxnBegin => {
912                        // Use entry() to avoid overwriting existing data if a
913                        // duplicate TxnBegin appears (e.g., from PID recycling).
914                        pending_writes
915                            .entry(entry.txn_id)
916                            .or_insert_with(Vec::new);
917                    }
918                    WalRecordType::Data => {
919                        // Accept data for any txn_id we've seen a Begin for,
920                        // and also for txn_ids without a Begin (they might have
921                        // their Begin later in the WAL due to buffered writes).
922                        pending_writes
923                            .entry(entry.txn_id)
924                            .or_insert_with(Vec::new)
925                            .push((entry.key, entry.value));
926                    }
927                    WalRecordType::TxnCommit => {
928                        committed_txns.insert(entry.txn_id);
929                    }
930                    WalRecordType::TxnAbort => {
931                        pending_writes.remove(&entry.txn_id);
932                    }
933                    _ => {}
934                },
935                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
936                    break;
937                }
938                Err(_) => {
939                    break;
940                }
941            }
942        }
943
944        // Collect committed writes
945        let mut result = Vec::new();
946        let mut txn_count = 0;
947
948        for (txn_id, writes) in pending_writes {
949            if committed_txns.contains(&txn_id) {
950                result.extend(writes);
951                txn_count += 1;
952            }
953            // Uncommitted transactions are discarded (rollback)
954        }
955
956        Ok((result, txn_count))
957    }
958
959    /// Replay WAL with a callback
960    pub fn replay<F>(&self, mut callback: F) -> Result<u64>
961    where
962        F: FnMut(TxnWalEntry) -> Result<()>,
963    {
964        let file = File::open(&self.path)?;
965        let mut reader = BufReader::new(file);
966        let mut count = 0u64;
967
968        loop {
969            match TxnWalEntry::from_reader(&mut reader) {
970                Ok(entry) => {
971                    callback(entry)?;
972                    count += 1;
973                }
974                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
975                    break;
976                }
977                Err(e) => {
978                    // Log warning but continue (may be incomplete entry at end)
979                    eprintln!("WAL replay warning: {:?}", e);
980                    break;
981                }
982            }
983        }
984
985        Ok(count)
986    }
987
988    /// Truncate WAL (called after successful checkpoint)
989    ///
990    /// Flushes any buffered writes, truncates the file to 0 bytes,
991    /// and resets sequence counters. The file is opened in `O_APPEND`
992    /// mode so subsequent writes will correctly start at offset 0.
993    ///
994    /// **WARNING**: After truncation, all data durability is lost.
995    /// The in-memory memtable still holds data for the current session,
996    /// but a crash after truncation means the data cannot be recovered
997    /// from the WAL.
998    pub fn truncate(&self) -> Result<()> {
999        let mut writer = self.writer.lock();
1000        // Flush BufWriter so no stale data is written after truncation
1001        writer.flush()?;
1002        let file = writer.get_ref();
1003        file.set_len(0)?;
1004        file.sync_all()?;
1005        self.sequence.store(0, Ordering::SeqCst);
1006        self.bytes_since_sync.store(0, Ordering::Relaxed);
1007        Ok(())
1008    }
1009
1010    /// Write a checkpoint marker
1011    pub fn write_checkpoint(&self) -> Result<u64> {
1012        let entry = TxnWalEntry::checkpoint(0);
1013        self.append_sync(&entry)
1014    }
1015
1016    /// Write a Compensation Log Record (CLR) for undo operations
1017    ///
1018    /// CLRs are redo-only records that skip past already undone operations
1019    /// during recovery. The undo_next_lsn field tells recovery where to
1020    /// continue undoing after this CLR.
1021    pub fn append_clr(
1022        &self,
1023        txn_id: u64,
1024        _original_lsn: u64,
1025        undo_next_lsn: Option<u64>,
1026        undo_data: &[u8],
1027    ) -> Result<u64> {
1028        // Encode undo_next_lsn into the key field
1029        let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1030        let entry = TxnWalEntry {
1031            record_type: WalRecordType::CompensationLogRecord,
1032            txn_id,
1033            timestamp_us: TxnWalEntry::now_us(),
1034            key, // undo_next_lsn encoded in key
1035            value: undo_data.to_vec(),
1036        };
1037        self.append(&entry)
1038    }
1039
1040    /// Write checkpoint with data (for fuzzy checkpoints)
1041    pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1042        let entry = TxnWalEntry {
1043            record_type: WalRecordType::Checkpoint,
1044            txn_id: 0,
1045            timestamp_us: TxnWalEntry::now_us(),
1046            key: Vec::new(),
1047            value: checkpoint_data.to_vec(),
1048        };
1049        self.append_sync(&entry)
1050    }
1051
1052    /// Write checkpoint end with captured state
1053    pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1054        let entry = TxnWalEntry {
1055            record_type: WalRecordType::CheckpointEnd,
1056            txn_id: 0,
1057            timestamp_us: TxnWalEntry::now_us(),
1058            key: Vec::new(),
1059            value: checkpoint_data.to_vec(),
1060        };
1061        self.append_sync(&entry)
1062    }
1063
1064    /// Get current sequence number
1065    pub fn sequence(&self) -> u64 {
1066        self.sequence.load(Ordering::SeqCst)
1067    }
1068
1069    /// Get bytes written since last sync
1070    pub fn bytes_since_sync(&self) -> u64 {
1071        self.bytes_since_sync.load(Ordering::Relaxed)
1072    }
1073
1074    /// Get path to WAL file
1075    pub fn path(&self) -> &Path {
1076        &self.path
1077    }
1078}
1079
1080/// Statistics about WAL state
1081#[derive(Debug, Clone, Default)]
1082pub struct TxnWalStats {
1083    /// Number of entries written
1084    pub entries_written: u64,
1085    /// Bytes written since last sync
1086    pub bytes_since_sync: u64,
1087    /// Current transaction ID counter
1088    pub next_txn_id: u64,
1089}
1090
1091// ============================================================================
1092// Sharded WAL for Reduced Mutex Contention
1093// ============================================================================
1094
1095/// Sharded Write-Ahead Log for high-concurrency workloads
1096///
1097/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1098/// - Writers hash to shard by txn_id
1099/// - Each shard has its own buffer
1100/// - Central coordinator handles fsync ordering
1101///
1102/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1103#[allow(dead_code)]
1104pub struct ShardedWal {
1105    /// Shard writers (txn_id % num_shards selects shard)
1106    shards: Vec<parking_lot::Mutex<WalShard>>,
1107    /// Number of shards (power of 2)
1108    num_shards: usize,
1109    /// Central WAL file for ordered writes
1110    central_writer: parking_lot::Mutex<BufWriter<File>>,
1111    /// Next transaction ID
1112    next_txn_id: AtomicU64,
1113    /// Write sequence (global ordering)
1114    sequence: AtomicU64,
1115    /// Path
1116    path: PathBuf,
1117}
1118
1119/// Individual WAL shard buffer
1120struct WalShard {
1121    /// Buffered entries for this shard
1122    buffer: Vec<u8>,
1123    /// Number of entries buffered
1124    entry_count: usize,
1125}
1126
1127impl WalShard {
1128    fn new() -> Self {
1129        Self {
1130            buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1131            entry_count: 0,
1132        }
1133    }
1134
1135    fn append(&mut self, entry: &TxnWalEntry) {
1136        let bytes = entry.to_bytes();
1137        self.buffer.extend_from_slice(&bytes);
1138        self.entry_count += 1;
1139    }
1140
1141    fn is_empty(&self) -> bool {
1142        self.buffer.is_empty()
1143    }
1144
1145    fn drain(&mut self) -> Vec<u8> {
1146        self.entry_count = 0;
1147        std::mem::take(&mut self.buffer)
1148    }
1149}
1150
1151impl ShardedWal {
1152    /// Create sharded WAL with specified number of shards
1153    ///
1154    /// Recommended: 4-8 shards for typical server workloads
1155    pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1156        let path = path.as_ref().to_path_buf();
1157
1158        if let Some(parent) = path.parent() {
1159            std::fs::create_dir_all(parent)?;
1160        }
1161
1162        let file = std::fs::OpenOptions::new()
1163            .create(true)
1164            .append(true)
1165            .read(true)
1166            .open(&path)?;
1167
1168        // Round up to power of 2 for fast modulo
1169        let num_shards = num_shards.next_power_of_two();
1170        let shards: Vec<_> = (0..num_shards)
1171            .map(|_| parking_lot::Mutex::new(WalShard::new()))
1172            .collect();
1173
1174        Ok(Self {
1175            shards,
1176            num_shards,
1177            central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1178            next_txn_id: AtomicU64::new(1),
1179            sequence: AtomicU64::new(0),
1180            path,
1181        })
1182    }
1183
1184    /// Get shard index for transaction
1185    #[inline]
1186    fn shard_idx(&self, txn_id: u64) -> usize {
1187        (txn_id as usize) & (self.num_shards - 1)
1188    }
1189
1190    /// Append entry to appropriate shard (lock-free for different txns)
1191    pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1192        let shard_idx = self.shard_idx(entry.txn_id);
1193        let mut shard = self.shards[shard_idx].lock();
1194        shard.append(entry);
1195        self.sequence.fetch_add(1, Ordering::SeqCst)
1196    }
1197
1198    /// Allocate transaction ID
1199    pub fn alloc_txn_id(&self) -> u64 {
1200        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1201    }
1202
1203    /// Flush all shard buffers to central file
1204    pub fn flush(&self) -> Result<()> {
1205        let mut central = self.central_writer.lock();
1206
1207        // Collect all shard buffers (brief lock per shard)
1208        for shard in &self.shards {
1209            let mut shard_guard = shard.lock();
1210            if !shard_guard.is_empty() {
1211                let data = shard_guard.drain();
1212                central.write_all(&data)?;
1213            }
1214        }
1215
1216        central.flush()?;
1217        Ok(())
1218    }
1219
1220    /// Sync to disk (fsync)
1221    pub fn sync(&self) -> Result<()> {
1222        self.flush()?;
1223        let central = self.central_writer.lock();
1224        central.get_ref().sync_all()?;
1225        Ok(())
1226    }
1227
1228    /// Begin transaction
1229    pub fn begin_transaction(&self) -> Result<u64> {
1230        let txn_id = self.alloc_txn_id();
1231        let entry = TxnWalEntry::txn_begin(txn_id);
1232        self.append(&entry);
1233        Ok(txn_id)
1234    }
1235
1236    /// Write data
1237    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1238        let entry = TxnWalEntry::data(txn_id, key, value);
1239        Ok(self.append(&entry))
1240    }
1241
1242    /// Commit transaction
1243    pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1244        let entry = TxnWalEntry::txn_commit(txn_id);
1245        let seq = self.append(&entry);
1246        self.sync()?; // Fsync on commit for durability
1247        Ok(seq)
1248    }
1249
1250    /// Get statistics
1251    pub fn stats(&self) -> ShardedWalStats {
1252        let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1253        for shard in &self.shards {
1254            shard_entry_counts.push(shard.lock().entry_count);
1255        }
1256
1257        ShardedWalStats {
1258            num_shards: self.num_shards,
1259            total_entries: self.sequence.load(Ordering::SeqCst),
1260            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1261            shard_entry_counts,
1262        }
1263    }
1264}
1265
1266/// Statistics for sharded WAL
1267#[derive(Debug, Clone)]
1268pub struct ShardedWalStats {
1269    pub num_shards: usize,
1270    pub total_entries: u64,
1271    pub next_txn_id: u64,
1272    pub shard_entry_counts: Vec<usize>,
1273}
1274
1275/// Detailed crash recovery statistics
1276#[derive(Debug, Clone, Default)]
1277pub struct CrashRecoveryStats {
1278    /// Total records read from WAL
1279    pub total_records: u64,
1280    /// Number of committed transactions
1281    pub committed_txns: u64,
1282    /// Number of uncommitted (rolled back) transactions
1283    pub rolled_back_txns: u64,
1284    /// Number of explicitly aborted transactions
1285    pub aborted_txns: u64,
1286    /// Number of data writes recovered
1287    pub recovered_writes: u64,
1288    /// Number of torn/corrupted records at end (expected on crash)
1289    pub torn_records: u64,
1290    /// Bytes read from WAL
1291    pub bytes_read: u64,
1292    /// Recovery duration in microseconds
1293    pub recovery_duration_us: u64,
1294    /// Highest transaction ID seen (for restarting counter)
1295    pub max_txn_id: u64,
1296}
1297
1298impl TxnWal {
1299    /// Get WAL statistics
1300    pub fn stats(&self) -> TxnWalStats {
1301        TxnWalStats {
1302            entries_written: self.sequence.load(Ordering::SeqCst),
1303            bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1304            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1305        }
1306    }
1307
1308    /// Full crash recovery with detailed statistics
1309    ///
1310    /// This method provides ACID recovery guarantees:
1311    /// 1. **Atomicity**: Uncommitted transactions are rolled back
1312    /// 2. **Durability**: All committed transactions are replayed
1313    /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1314    ///
1315    /// Returns (committed_writes, stats) where committed_writes contains
1316    /// all key-value pairs from committed transactions in order.
1317    #[allow(clippy::type_complexity)]
1318    pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1319        let start_time = std::time::Instant::now();
1320        let file = File::open(&self.path)?;
1321        let file_size = file.metadata()?.len();
1322        let mut reader = BufReader::new(file);
1323
1324        let mut stats = CrashRecoveryStats {
1325            bytes_read: file_size,
1326            ..Default::default()
1327        };
1328
1329        let mut committed_txns: HashSet<u64> = HashSet::new();
1330        let mut aborted_txns: HashSet<u64> = HashSet::new();
1331        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1332            std::collections::HashMap::new();
1333        let mut all_txns: HashSet<u64> = HashSet::new();
1334
1335        // Read all records, stopping at first corruption (torn write)
1336        loop {
1337            match TxnWalEntry::from_reader(&mut reader) {
1338                Ok(entry) => {
1339                    stats.total_records += 1;
1340                    if entry.txn_id > stats.max_txn_id {
1341                        stats.max_txn_id = entry.txn_id;
1342                    }
1343
1344                    match entry.record_type {
1345                        WalRecordType::TxnBegin => {
1346                            pending_writes.insert(entry.txn_id, Vec::new());
1347                            all_txns.insert(entry.txn_id);
1348                        }
1349                        WalRecordType::Data => {
1350                            if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1351                                writes.push((entry.key, entry.value));
1352                            }
1353                        }
1354                        WalRecordType::TxnCommit => {
1355                            committed_txns.insert(entry.txn_id);
1356                        }
1357                        WalRecordType::TxnAbort => {
1358                            pending_writes.remove(&entry.txn_id);
1359                            aborted_txns.insert(entry.txn_id);
1360                        }
1361                        _ => {}
1362                    }
1363                }
1364                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1365                    // Clean EOF
1366                    break;
1367                }
1368                Err(_) => {
1369                    // Corruption or torn write - stop here
1370                    stats.torn_records += 1;
1371                    break;
1372                }
1373            }
1374        }
1375
1376        // Collect committed writes
1377        let mut result = Vec::new();
1378        for (txn_id, writes) in &pending_writes {
1379            if committed_txns.contains(txn_id) {
1380                stats.committed_txns += 1;
1381                stats.recovered_writes += writes.len() as u64;
1382                result.extend(writes.clone());
1383            }
1384        }
1385
1386        // Count aborted and rolled-back transactions
1387        stats.aborted_txns = aborted_txns.len() as u64;
1388        stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1389
1390        stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1391
1392        Ok((result, stats))
1393    }
1394}
1395
1396#[cfg(test)]
1397mod tests {
1398    use super::*;
1399    use tempfile::tempdir;
1400
1401    #[test]
1402    fn test_wal_entry_roundtrip() {
1403        let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1404        let bytes = entry.to_bytes();
1405
1406        let mut cursor = std::io::Cursor::new(bytes);
1407        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1408
1409        assert_eq!(recovered.record_type, WalRecordType::Data);
1410        assert_eq!(recovered.txn_id, 42);
1411        assert_eq!(recovered.key, b"key");
1412        assert_eq!(recovered.value, b"value");
1413    }
1414
1415    #[test]
1416    fn test_wal_append_and_replay() {
1417        let dir = tempdir().unwrap();
1418        let wal_path = dir.path().join("test.wal");
1419
1420        // Write some entries
1421        {
1422            let wal = TxnWal::new(&wal_path).unwrap();
1423            let txn_id = wal.begin_transaction().unwrap();
1424            wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1425            wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1426            wal.commit_transaction(txn_id).unwrap();
1427        }
1428
1429        // Replay and verify
1430        {
1431            let wal = TxnWal::new(&wal_path).unwrap();
1432            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1433
1434            assert_eq!(txn_count, 1);
1435            assert_eq!(writes.len(), 2);
1436            assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1437            assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1438        }
1439    }
1440
1441    #[test]
1442    fn test_uncommitted_transaction_rollback() {
1443        let dir = tempdir().unwrap();
1444        let wal_path = dir.path().join("test.wal");
1445
1446        // Write committed and uncommitted transactions
1447        {
1448            let wal = TxnWal::new(&wal_path).unwrap();
1449
1450            // Committed transaction
1451            let txn1 = wal.begin_transaction().unwrap();
1452            wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1453                .unwrap();
1454            wal.commit_transaction(txn1).unwrap();
1455
1456            // Uncommitted transaction (simulates crash)
1457            let txn2 = wal.begin_transaction().unwrap();
1458            wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1459                .unwrap();
1460            // No commit!
1461        }
1462
1463        // Replay - uncommitted should be rolled back
1464        {
1465            let wal = TxnWal::new(&wal_path).unwrap();
1466            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1467
1468            assert_eq!(txn_count, 1); // Only committed transaction
1469            assert_eq!(writes.len(), 1);
1470            assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1471        }
1472    }
1473
1474    #[test]
1475    fn test_aborted_transaction() {
1476        let dir = tempdir().unwrap();
1477        let wal_path = dir.path().join("test.wal");
1478
1479        {
1480            let wal = TxnWal::new(&wal_path).unwrap();
1481
1482            let txn = wal.begin_transaction().unwrap();
1483            wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1484                .unwrap();
1485            wal.abort_transaction(txn).unwrap();
1486        }
1487
1488        {
1489            let wal = TxnWal::new(&wal_path).unwrap();
1490            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1491
1492            assert_eq!(txn_count, 0);
1493            assert!(writes.is_empty());
1494        }
1495    }
1496
1497    #[test]
1498    fn test_checksum_validation() {
1499        let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1500        let mut bytes = entry.to_bytes();
1501
1502        // Corrupt the checksum
1503        let len = bytes.len();
1504        bytes[len - 1] ^= 0xFF;
1505
1506        let mut cursor = std::io::Cursor::new(bytes);
1507        let result = TxnWalEntry::from_reader(&mut cursor);
1508
1509        assert!(result.is_err());
1510    }
1511
1512    #[test]
1513    fn test_crash_recovery_with_stats() {
1514        let dir = tempdir().unwrap();
1515        let wal_path = dir.path().join("test.wal");
1516
1517        // Simulate complex workload
1518        {
1519            let wal = TxnWal::new(&wal_path).unwrap();
1520
1521            // Committed transaction 1
1522            let txn1 = wal.begin_transaction().unwrap();
1523            wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1524            wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1525            wal.commit_transaction(txn1).unwrap();
1526
1527            // Aborted transaction
1528            let txn2 = wal.begin_transaction().unwrap();
1529            wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1530                .unwrap();
1531            wal.abort_transaction(txn2).unwrap();
1532
1533            // Committed transaction 2
1534            let txn3 = wal.begin_transaction().unwrap();
1535            wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1536            wal.commit_transaction(txn3).unwrap();
1537
1538            // Uncommitted transaction (simulates crash)
1539            let txn4 = wal.begin_transaction().unwrap();
1540            wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1541                .unwrap();
1542            // No commit - simulates crash
1543        }
1544
1545        // Recover and verify
1546        {
1547            let wal = TxnWal::new(&wal_path).unwrap();
1548            let (writes, stats) = wal.crash_recovery().unwrap();
1549
1550            // Should have 3 writes from 2 committed transactions
1551            assert_eq!(writes.len(), 3);
1552            assert_eq!(stats.committed_txns, 2);
1553            assert_eq!(stats.aborted_txns, 1);
1554            assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1555            assert_eq!(stats.recovered_writes, 3);
1556            assert!(stats.recovery_duration_us > 0);
1557        }
1558    }
1559
1560    #[test]
1561    fn test_torn_write_detection() {
1562        let dir = tempdir().unwrap();
1563        let wal_path = dir.path().join("test.wal");
1564
1565        // Write a valid transaction
1566        {
1567            let wal = TxnWal::new(&wal_path).unwrap();
1568            let txn = wal.begin_transaction().unwrap();
1569            wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1570            wal.commit_transaction(txn).unwrap();
1571        }
1572
1573        // Append corrupted bytes to simulate torn write
1574        {
1575            use std::io::Write;
1576            let mut file = std::fs::OpenOptions::new()
1577                .append(true)
1578                .open(&wal_path)
1579                .unwrap();
1580            // Write partial record (torn write)
1581            file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1582                .unwrap();
1583        }
1584
1585        // Recovery should still work, detecting torn write
1586        {
1587            let wal = TxnWal::new(&wal_path).unwrap();
1588            let (writes, stats) = wal.crash_recovery().unwrap();
1589
1590            // Should recover the valid transaction
1591            assert_eq!(writes.len(), 1);
1592            assert_eq!(stats.committed_txns, 1);
1593            assert_eq!(stats.torn_records, 1);
1594        }
1595    }
1596
1597    #[test]
1598    fn test_crc32_determinism() {
1599        // Verify CRC32 produces consistent checksums for same content
1600        let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1601        entry1.timestamp_us = 12345; // Fixed timestamp for determinism
1602
1603        let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1604        entry2.timestamp_us = 12345; // Same timestamp
1605
1606        assert_eq!(entry1.checksum(), entry2.checksum());
1607
1608        // Different content should produce different checksum
1609        let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1610        entry3.timestamp_us = 12345;
1611        assert_ne!(entry1.checksum(), entry3.checksum());
1612
1613        // Verify roundtrip preserves checksum
1614        let bytes = entry1.to_bytes();
1615        let mut cursor = std::io::Cursor::new(bytes);
1616        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1617        assert_eq!(recovered.checksum(), entry1.checksum());
1618    }
1619}