sochdb_storage/
txn_wal.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Transaction-Aware WAL for ACID Transactions
16//!
17//! This WAL implementation provides ACID transaction support:
18//! - Atomicity: All writes in a transaction are logged together
19//! - Consistency: Schema validation before commit
20//! - Isolation: Transaction IDs for MVCC
21//! - Durability: fsync after commit
22//!
23//! ## WAL Record Types
24//!
25//! - Data: Key-value write
26//! - TxnBegin: Start of transaction
27//! - TxnCommit: Commit point (durability guarantee)
28//! - TxnAbort: Transaction rollback
29//! - Checkpoint: Snapshot marker for recovery
30//! - SchemaChange: DDL operations
31//!
32//! ## Record Format (Fixed Layout for Torn Write Detection)
33//!
34//! ```text
35//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
36//! │ Length(4) │ Type(1)  │ TxnId(8)│ Timestamp │ KeyLen  │ ValueLen  │ Key(*)  │ Value(*)│ CRC32(4) │
37//! │           │          │         │   (8)     │  (4)    │   (4)     │         │         │          │
38//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
39//! ```
40//!
41//! ## Crash Recovery Guarantees
42//!
43//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
44//! 2. **Atomic Commit**: Commit record with fsync ensures durability
45//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
46//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
47
48use byteorder::{LittleEndian, ReadBytesExt};
49use parking_lot::Mutex;
50use std::cell::Cell;
51use std::collections::HashSet;
52use std::fs::{File, OpenOptions};
53use std::io::{BufReader, BufWriter, Read, Write};
54use std::path::{Path, PathBuf};
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{Instant, SystemTime, UNIX_EPOCH};
57use sochdb_core::{Result, SochDBError, WalRecordType};
58
59// =============================================================================
60// Coarse-Grained Timestamp Caching (Recommendation 5)
61// =============================================================================
62
63/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
64const CACHE_VALIDITY_NS: u64 = 1_000_000;
65
66thread_local! {
67    /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
68    /// Avoids syscall overhead by caching timestamp for ~1ms windows
69    static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
70}
71
72/// Get cached timestamp in microseconds
73///
74/// This function eliminates per-write `SystemTime::now()` syscalls by:
75/// 1. Caching the wall-clock timestamp in thread-local storage
76/// 2. Using monotonic `Instant` for sub-millisecond offsets
77/// 3. Only refreshing from syscall when cache expires (every ~1ms)
78///
79/// ## Performance Impact
80///
81/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
82/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
83///
84/// ## Monotonicity Guarantee
85///
86/// Timestamps are guaranteed to be monotonically increasing within a thread
87/// due to the `elapsed` offset added to the cached base timestamp.
88#[inline(always)]
89pub fn cached_timestamp_us() -> u64 {
90    TS_CACHE.with(|cache| {
91        let (instant, ts) = cache.get();
92        let elapsed_ns = instant.elapsed().as_nanos() as u64;
93        
94        if elapsed_ns < CACHE_VALIDITY_NS {
95            // Fast path: return cached + monotonic offset
96            // This is pure arithmetic, no syscall
97            ts + elapsed_ns / 1000
98        } else {
99            // Slow path: refresh cache from syscall
100            let new_ts = SystemTime::now()
101                .duration_since(UNIX_EPOCH)
102                .unwrap()
103                .as_micros() as u64;
104            cache.set((Instant::now(), new_ts));
105            new_ts
106        }
107    })
108}
109
110/// Header size in bytes (without key/value data)
111const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
112
113/// Checksum size (CRC32)
114const CHECKSUM_SIZE: usize = 4;
115
116/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
117const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
118
119// =============================================================================
120// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
121// =============================================================================
122
123/// Transaction-local WAL write buffer
124///
125/// Collects all writes during a transaction in memory, then flushes
126/// everything with a SINGLE lock acquisition at commit time.
127///
128/// ## Performance Impact
129///
130/// ```text
131/// Without TxnWalBuffer (current):
132///   1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
133///   Lock overhead: 1000 × 20ns = 20µs per transaction
134///
135/// With TxnWalBuffer:
136///   1000 writes buffered locally (NO LOCK)
137///   1 flush at commit (SINGLE LOCK)
138///   Lock overhead: 1 × 20ns = 20ns per transaction
139///   Speedup: 1000× for lock overhead
140/// ```
141///
142/// ## Usage
143///
144/// ```ignore
145/// let mut buffer = TxnWalBuffer::new(txn_id);
146///
147/// // These do NOT acquire any locks
148/// buffer.append(b"key1", b"value1");
149/// buffer.append(b"key2", b"value2");
150/// // ... hundreds of writes ...
151///
152/// // Single lock acquisition, single write syscall
153/// buffer.flush(&wal)?;
154/// ```
155#[derive(Debug)]
156pub struct TxnWalBuffer {
157    /// Transaction ID for all entries
158    txn_id: u64,
159    /// Accumulated serialized entries
160    buffer: Vec<u8>,
161    /// Number of entries buffered
162    entry_count: usize,
163}
164
165impl TxnWalBuffer {
166    /// Create a new buffer for a transaction
167    #[inline]
168    pub fn new(txn_id: u64) -> Self {
169        Self {
170            txn_id,
171            buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
172            entry_count: 0,
173        }
174    }
175
176    /// Create with specific capacity
177    #[inline]
178    pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
179        Self {
180            txn_id,
181            buffer: Vec::with_capacity(capacity),
182            entry_count: 0,
183        }
184    }
185
186    /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
187    ///
188    /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
189    /// This is completely lock-free and does not touch the file system.
190    ///
191    /// Uses cached timestamps to eliminate per-write syscall overhead.
192    #[inline]
193    pub fn append(&mut self, key: &[u8], value: &[u8]) {
194        // Use cached timestamp instead of syscall (Recommendation 5)
195        let timestamp_us = cached_timestamp_us();
196
197        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
198        let entry_start = self.buffer.len();
199
200        // Reserve space for length prefix (will fill at end)
201        self.buffer.extend_from_slice(&[0u8; 4]);
202
203        let mut hasher = crc32fast::Hasher::new();
204
205        // Record type (Data = 0)
206        let record_type_byte = WalRecordType::Data as u8;
207        self.buffer.push(record_type_byte);
208        hasher.update(&[record_type_byte]);
209
210        // Transaction ID
211        let txn_bytes = self.txn_id.to_le_bytes();
212        self.buffer.extend_from_slice(&txn_bytes);
213        hasher.update(&txn_bytes);
214
215        // Timestamp
216        let ts_bytes = timestamp_us.to_le_bytes();
217        self.buffer.extend_from_slice(&ts_bytes);
218        hasher.update(&ts_bytes);
219
220        // Key length
221        let key_len_bytes = (key.len() as u32).to_le_bytes();
222        self.buffer.extend_from_slice(&key_len_bytes);
223        hasher.update(&key_len_bytes);
224
225        // Value length
226        let val_len_bytes = (value.len() as u32).to_le_bytes();
227        self.buffer.extend_from_slice(&val_len_bytes);
228        hasher.update(&val_len_bytes);
229
230        // Key data
231        self.buffer.extend_from_slice(key);
232        hasher.update(key);
233
234        // Value data
235        self.buffer.extend_from_slice(value);
236        hasher.update(value);
237
238        // CRC32 checksum
239        self.buffer
240            .extend_from_slice(&hasher.finalize().to_le_bytes());
241
242        // Fill in length prefix (content length, not including the 4-byte length field)
243        let content_len = (total_len - 4) as u32;
244        self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
245
246        self.entry_count += 1;
247    }
248
249    /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
250    ///
251    /// Acquires the WAL lock once and writes all accumulated entries.
252    /// Returns the first sequence number assigned to buffered entries.
253    ///
254    /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
255    /// as it properly handles the private writer field.
256    #[inline]
257    pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
258        wal.flush_buffer(self)
259    }
260
261    /// Clear the buffer (for reuse)
262    #[inline]
263    pub fn clear(&mut self) {
264        self.buffer.clear();
265        self.entry_count = 0;
266    }
267
268    /// Get number of buffered entries
269    #[inline]
270    pub fn entry_count(&self) -> usize {
271        self.entry_count
272    }
273
274    /// Get total bytes buffered
275    #[inline]
276    pub fn bytes_buffered(&self) -> usize {
277        self.buffer.len()
278    }
279
280    /// Check if buffer is empty
281    #[inline]
282    pub fn is_empty(&self) -> bool {
283        self.buffer.is_empty()
284    }
285}
286
287/// WAL entry for transaction-aware operations
288#[derive(Debug, Clone)]
289pub struct TxnWalEntry {
290    /// Record type
291    pub record_type: WalRecordType,
292    /// Transaction ID
293    pub txn_id: u64,
294    /// Timestamp in microseconds
295    pub timestamp_us: u64,
296    /// Key data
297    pub key: Vec<u8>,
298    /// Value data
299    pub value: Vec<u8>,
300}
301
302impl TxnWalEntry {
303    /// Create a new data entry
304    pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
305        Self {
306            record_type: WalRecordType::Data,
307            txn_id,
308            timestamp_us: Self::now_us(),
309            key,
310            value,
311        }
312    }
313
314    /// Create a transaction begin entry
315    pub fn txn_begin(txn_id: u64) -> Self {
316        Self {
317            record_type: WalRecordType::TxnBegin,
318            txn_id,
319            timestamp_us: Self::now_us(),
320            key: Vec::new(),
321            value: Vec::new(),
322        }
323    }
324
325    /// Create a transaction commit entry
326    pub fn txn_commit(txn_id: u64) -> Self {
327        Self {
328            record_type: WalRecordType::TxnCommit,
329            txn_id,
330            timestamp_us: Self::now_us(),
331            key: Vec::new(),
332            value: Vec::new(),
333        }
334    }
335
336    /// Create a transaction abort entry
337    pub fn txn_abort(txn_id: u64) -> Self {
338        Self {
339            record_type: WalRecordType::TxnAbort,
340            txn_id,
341            timestamp_us: Self::now_us(),
342            key: Vec::new(),
343            value: Vec::new(),
344        }
345    }
346
347    /// Create a checkpoint entry
348    pub fn checkpoint(txn_id: u64) -> Self {
349        Self {
350            record_type: WalRecordType::Checkpoint,
351            txn_id,
352            timestamp_us: Self::now_us(),
353            key: Vec::new(),
354            value: Vec::new(),
355        }
356    }
357
358    /// Create a schema change entry
359    pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
360        Self {
361            record_type: WalRecordType::SchemaChange,
362            txn_id,
363            timestamp_us: Self::now_us(),
364            key: Vec::new(),
365            value: schema_data,
366        }
367    }
368
369    /// Get current time in microseconds (uses cached timestamp)
370    #[inline]
371    fn now_us() -> u64 {
372        cached_timestamp_us()
373    }
374
375    /// Calculate CRC32 checksum for this entry
376    ///
377    /// Uses crc32fast for portable, deterministic checksums.
378    /// The checksum covers all fields except the checksum itself.
379    /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
380    pub fn checksum(&self) -> u32 {
381        let mut hasher = crc32fast::Hasher::new();
382        hasher.update(&[self.record_type as u8]);
383        hasher.update(&self.txn_id.to_le_bytes());
384        hasher.update(&self.timestamp_us.to_le_bytes());
385        hasher.update(&(self.key.len() as u32).to_le_bytes());
386        hasher.update(&(self.value.len() as u32).to_le_bytes());
387        hasher.update(&self.key);
388        hasher.update(&self.value);
389        hasher.finalize()
390    }
391
392    /// Serialize to bytes with single-pass CRC calculation
393    ///
394    /// Optimized to calculate CRC while building the buffer,
395    /// avoiding a second pass over all data.
396    pub fn to_bytes(&self) -> Vec<u8> {
397        let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
398        let mut buf = Vec::with_capacity(total_len);
399        let mut hasher = crc32fast::Hasher::new();
400
401        // Length (not including the length field itself) - not included in CRC
402        let content_len = (total_len - 4) as u32;
403        buf.extend_from_slice(&content_len.to_le_bytes());
404
405        // Record type
406        let record_type_byte = self.record_type as u8;
407        buf.push(record_type_byte);
408        hasher.update(&[record_type_byte]);
409
410        // Transaction ID
411        let txn_bytes = self.txn_id.to_le_bytes();
412        buf.extend_from_slice(&txn_bytes);
413        hasher.update(&txn_bytes);
414
415        // Timestamp
416        let ts_bytes = self.timestamp_us.to_le_bytes();
417        buf.extend_from_slice(&ts_bytes);
418        hasher.update(&ts_bytes);
419
420        // Key length
421        let key_len_bytes = (self.key.len() as u32).to_le_bytes();
422        buf.extend_from_slice(&key_len_bytes);
423        hasher.update(&key_len_bytes);
424
425        // Value length
426        let val_len_bytes = (self.value.len() as u32).to_le_bytes();
427        buf.extend_from_slice(&val_len_bytes);
428        hasher.update(&val_len_bytes);
429
430        // Key data
431        buf.extend_from_slice(&self.key);
432        hasher.update(&self.key);
433
434        // Value data
435        buf.extend_from_slice(&self.value);
436        hasher.update(&self.value);
437
438        // CRC32 Checksum (computed in single pass above)
439        buf.extend_from_slice(&hasher.finalize().to_le_bytes());
440
441        buf
442    }
443
444    /// Deserialize from bytes with torn write detection
445    ///
446    /// Returns error if:
447    /// - Length field indicates data is too short
448    /// - CRC32 checksum mismatch (corruption or torn write)
449    /// - Invalid record type
450    pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
451        // Length (allows torn write detection - if length claims more data than exists)
452        let content_len = reader.read_u32::<LittleEndian>()?;
453        if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
454            return Err(SochDBError::Corruption("WAL entry too short".into()));
455        }
456
457        // Record type
458        let record_type_byte = reader.read_u8()?;
459        let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
460            SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
461        })?;
462
463        // Transaction ID
464        let txn_id = reader.read_u64::<LittleEndian>()?;
465
466        // Timestamp
467        let timestamp_us = reader.read_u64::<LittleEndian>()?;
468
469        // Key length
470        let key_len = reader.read_u32::<LittleEndian>()? as usize;
471
472        // Value length
473        let value_len = reader.read_u32::<LittleEndian>()? as usize;
474
475        // Key data
476        let mut key = vec![0u8; key_len];
477        reader.read_exact(&mut key)?;
478
479        // Value data
480        let mut value = vec![0u8; value_len];
481        reader.read_exact(&mut value)?;
482
483        // CRC32 Checksum
484        let stored_checksum = reader.read_u32::<LittleEndian>()?;
485
486        let entry = Self {
487            record_type,
488            txn_id,
489            timestamp_us,
490            key,
491            value,
492        };
493
494        // Verify checksum - detects both corruption and torn writes
495        if entry.checksum() != stored_checksum {
496            return Err(SochDBError::Corruption(format!(
497                "WAL checksum mismatch for txn_id {}: expected {}, got {}",
498                txn_id,
499                entry.checksum(),
500                stored_checksum
501            )));
502        }
503
504        Ok(entry)
505    }
506}
507
508/// Transaction-aware Write-Ahead Log
509pub struct TxnWal {
510    /// Path to WAL file
511    path: PathBuf,
512    /// Buffered writer
513    writer: Mutex<BufWriter<File>>,
514    /// Next transaction ID
515    next_txn_id: AtomicU64,
516    /// Write sequence number
517    sequence: AtomicU64,
518    /// Bytes written since last sync
519    bytes_since_sync: AtomicU64,
520    /// Cached timestamp (microseconds since epoch)
521    /// Updated periodically to avoid syscall per write
522    cached_timestamp_us: AtomicU64,
523}
524
525impl TxnWal {
526    /// Create a new WAL or open existing one
527    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
528        let path = path.as_ref().to_path_buf();
529
530        // Ensure parent directory exists
531        if let Some(parent) = path.parent() {
532            std::fs::create_dir_all(parent)?;
533        }
534
535        let file = OpenOptions::new()
536            .create(true)
537            .append(true)
538            .read(true)
539            .open(&path)?;
540
541        // Use 256KB buffer for better batch performance (default is 8KB)
542        // This reduces system calls when buffering many small writes
543        let now_us = cached_timestamp_us();
544
545        let wal = Self {
546            path,
547            writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
548            next_txn_id: AtomicU64::new(1),
549            sequence: AtomicU64::new(0),
550            bytes_since_sync: AtomicU64::new(0),
551            cached_timestamp_us: AtomicU64::new(now_us),
552        };
553
554        // Recover state from existing WAL
555        wal.recover_state()?;
556
557        Ok(wal)
558    }
559
560    /// Recover state (next txn ID, sequence) from existing WAL
561    fn recover_state(&self) -> Result<()> {
562        let file = File::open(&self.path)?;
563        let mut reader = BufReader::new(file);
564        let mut max_txn_id: u64 = 0;
565        let mut count: u64 = 0;
566
567        loop {
568            match TxnWalEntry::from_reader(&mut reader) {
569                Ok(entry) => {
570                    if entry.txn_id > max_txn_id {
571                        max_txn_id = entry.txn_id;
572                    }
573                    count += 1;
574                }
575                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
576                    break;
577                }
578                Err(_) => {
579                    // Ignore corruption at end of WAL (incomplete write)
580                    break;
581                }
582            }
583        }
584
585        self.next_txn_id.store(max_txn_id + 1, Ordering::SeqCst);
586        self.sequence.store(count, Ordering::SeqCst);
587
588        Ok(())
589    }
590
591    /// Get cached timestamp, updating if stale (>1ms old)
592    ///
593    /// This avoids a syscall per write by caching the timestamp.
594    /// For WAL purposes, ~1ms granularity is sufficient.
595    #[inline]
596    fn get_cached_timestamp(&self) -> u64 {
597        // Fast path: use cached value (no syscall)
598        let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
599
600        // Refresh occasionally (every ~1000 writes or when sequence wraps)
601        // This is a very cheap check since sequence is incremented atomically anyway
602        let seq = self.sequence.load(Ordering::Relaxed);
603        if seq & 0x3FF == 0 {
604            // Refresh every 1024 writes
605            let now_us = cached_timestamp_us();
606            self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
607            return now_us;
608        }
609
610        cached
611    }
612
613    /// Append an entry to the WAL
614    ///
615    /// Returns the sequence number of this write.
616    /// NOTE: Does NOT flush - caller must call flush() or sync() for durability.
617    /// This is intentional for performance - BufWriter handles batching.
618    pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
619        let bytes = entry.to_bytes();
620        let mut writer = self.writer.lock();
621
622        writer.write_all(&bytes)?;
623        // Don't flush here - BufWriter will batch writes automatically.
624        // Call flush() explicitly before sync() or commit().
625
626        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
627        self.bytes_since_sync
628            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
629
630        Ok(seq)
631    }
632
633    /// Append entry without flushing (for batched writes)
634    ///
635    /// Caller must call flush() or sync() afterward to ensure durability.
636    #[inline]
637    pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
638        let bytes = entry.to_bytes();
639        let mut writer = self.writer.lock();
640
641        writer.write_all(&bytes)?;
642        // No flush - let BufWriter buffer the writes
643
644        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
645        self.bytes_since_sync
646            .fetch_add(bytes.len() as u64, Ordering::Relaxed);
647
648        Ok(seq)
649    }
650
651    /// Write data without flushing (for batched writes within a transaction)
652    #[inline]
653    pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
654        let entry = TxnWalEntry::data(txn_id, key, value);
655        self.append_no_flush(&entry)
656    }
657
658    /// Write data from slices without any allocation
659    ///
660    /// This is the fastest path for writing - no intermediate Vec allocations.
661    /// Calculates CRC32 while serializing directly to BufWriter.
662    #[inline]
663    pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
664        // Use coarse timestamp (cached every ~1ms) instead of syscall per write
665        let timestamp_us = self.get_cached_timestamp();
666
667        let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
668        let mut hasher = crc32fast::Hasher::new();
669
670        let mut writer = self.writer.lock();
671
672        // Length (not included in CRC)
673        let content_len = (total_len - 4) as u32;
674        writer.write_all(&content_len.to_le_bytes())?;
675
676        // Record type
677        let record_type_byte = WalRecordType::Data as u8;
678        writer.write_all(&[record_type_byte])?;
679        hasher.update(&[record_type_byte]);
680
681        // Transaction ID
682        let txn_bytes = txn_id.to_le_bytes();
683        writer.write_all(&txn_bytes)?;
684        hasher.update(&txn_bytes);
685
686        // Timestamp
687        let ts_bytes = timestamp_us.to_le_bytes();
688        writer.write_all(&ts_bytes)?;
689        hasher.update(&ts_bytes);
690
691        // Key length
692        let key_len_bytes = (key.len() as u32).to_le_bytes();
693        writer.write_all(&key_len_bytes)?;
694        hasher.update(&key_len_bytes);
695
696        // Value length
697        let val_len_bytes = (value.len() as u32).to_le_bytes();
698        writer.write_all(&val_len_bytes)?;
699        hasher.update(&val_len_bytes);
700
701        // Key data
702        writer.write_all(key)?;
703        hasher.update(key);
704
705        // Value data
706        writer.write_all(value)?;
707        hasher.update(value);
708
709        // CRC32 checksum
710        writer.write_all(&hasher.finalize().to_le_bytes())?;
711
712        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
713        self.bytes_since_sync
714            .fetch_add(total_len as u64, Ordering::Relaxed);
715
716        Ok(seq)
717    }
718
719    /// Flush pending writes to kernel buffer (but not to disk)
720    pub fn flush(&self) -> Result<()> {
721        let mut writer = self.writer.lock();
722        writer.flush()?;
723        Ok(())
724    }
725
726    /// Append and immediately sync for durability
727    pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
728        let seq = self.append(entry)?;
729        self.sync()?;
730        Ok(seq)
731    }
732
733    /// Force sync to disk
734    pub fn sync(&self) -> Result<()> {
735        let writer = self.writer.lock();
736        writer.get_ref().sync_all()?;
737        self.bytes_since_sync.store(0, Ordering::Relaxed);
738        Ok(())
739    }
740
741    /// Flush a TxnWalBuffer with single lock acquisition
742    ///
743    /// This is the high-performance path for transaction commit:
744    /// - All writes during the transaction are buffered locally
745    /// - At commit, this method flushes everything with ONE lock
746    ///
747    /// ## Performance
748    ///
749    /// ```text
750    /// 1000 writes with individual flush: 1000 × lock overhead
751    /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
752    /// Speedup: ~1000× for lock overhead
753    /// ```
754    #[inline]
755    pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
756        if buffer.is_empty() {
757            return Ok(0);
758        }
759
760        let mut writer = self.writer.lock();
761        writer.write_all(&buffer.buffer)?;
762
763        let seq = self
764            .sequence
765            .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
766        self.bytes_since_sync
767            .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
768
769        Ok(seq)
770    }
771
772    /// Get the current size of the WAL file in bytes
773    pub fn size_bytes(&self) -> u64 {
774        std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
775    }
776
777    /// Allocate a new transaction ID
778    pub fn alloc_txn_id(&self) -> u64 {
779        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
780    }
781
782    /// Begin a new transaction
783    pub fn begin_transaction(&self) -> Result<u64> {
784        let txn_id = self.alloc_txn_id();
785        let entry = TxnWalEntry::txn_begin(txn_id);
786        self.append(&entry)?;
787        Ok(txn_id)
788    }
789
790    /// Commit a transaction (with fsync for durability)
791    ///
792    /// This flushes all pending writes and then fsyncs the commit record.
793    pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
794        // First flush any pending buffered writes
795        self.flush()?;
796
797        // Then write commit record with fsync
798        let entry = TxnWalEntry::txn_commit(txn_id);
799        self.append_sync(&entry)?;
800        Ok(())
801    }
802
803    /// Abort a transaction
804    pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
805        let entry = TxnWalEntry::txn_abort(txn_id);
806        self.append(&entry)?;
807        Ok(())
808    }
809
810    /// Write data within a transaction
811    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
812        let entry = TxnWalEntry::data(txn_id, key, value);
813        self.append(&entry)
814    }
815
816    /// Replay WAL for crash recovery
817    ///
818    /// Returns (committed_writes, recovered_txn_count)
819    #[allow(clippy::type_complexity)]
820    pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
821        let file = File::open(&self.path)?;
822        let mut reader = BufReader::new(file);
823
824        let mut committed_txns: HashSet<u64> = HashSet::new();
825        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
826            std::collections::HashMap::new();
827
828        // First pass: find committed transactions
829        loop {
830            match TxnWalEntry::from_reader(&mut reader) {
831                Ok(entry) => match entry.record_type {
832                    WalRecordType::TxnBegin => {
833                        pending_writes.insert(entry.txn_id, Vec::new());
834                    }
835                    WalRecordType::Data => {
836                        if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
837                            writes.push((entry.key, entry.value));
838                        }
839                    }
840                    WalRecordType::TxnCommit => {
841                        committed_txns.insert(entry.txn_id);
842                    }
843                    WalRecordType::TxnAbort => {
844                        pending_writes.remove(&entry.txn_id);
845                    }
846                    _ => {}
847                },
848                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
849                    break;
850                }
851                Err(_) => {
852                    break;
853                }
854            }
855        }
856
857        // Collect committed writes
858        let mut result = Vec::new();
859        let mut txn_count = 0;
860
861        for (txn_id, writes) in pending_writes {
862            if committed_txns.contains(&txn_id) {
863                result.extend(writes);
864                txn_count += 1;
865            }
866            // Uncommitted transactions are discarded (rollback)
867        }
868
869        Ok((result, txn_count))
870    }
871
872    /// Replay WAL with a callback
873    pub fn replay<F>(&self, mut callback: F) -> Result<u64>
874    where
875        F: FnMut(TxnWalEntry) -> Result<()>,
876    {
877        let file = File::open(&self.path)?;
878        let mut reader = BufReader::new(file);
879        let mut count = 0u64;
880
881        loop {
882            match TxnWalEntry::from_reader(&mut reader) {
883                Ok(entry) => {
884                    callback(entry)?;
885                    count += 1;
886                }
887                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
888                    break;
889                }
890                Err(e) => {
891                    // Log warning but continue (may be incomplete entry at end)
892                    eprintln!("WAL replay warning: {:?}", e);
893                    break;
894                }
895            }
896        }
897
898        Ok(count)
899    }
900
901    /// Truncate WAL (called after successful checkpoint)
902    pub fn truncate(&self) -> Result<()> {
903        let writer = self.writer.lock();
904        let file = writer.get_ref();
905        file.set_len(0)?;
906        file.sync_all()?;
907        self.sequence.store(0, Ordering::SeqCst);
908        self.bytes_since_sync.store(0, Ordering::Relaxed);
909        Ok(())
910    }
911
912    /// Write a checkpoint marker
913    pub fn write_checkpoint(&self) -> Result<u64> {
914        let entry = TxnWalEntry::checkpoint(0);
915        self.append_sync(&entry)
916    }
917
918    /// Write a Compensation Log Record (CLR) for undo operations
919    ///
920    /// CLRs are redo-only records that skip past already undone operations
921    /// during recovery. The undo_next_lsn field tells recovery where to
922    /// continue undoing after this CLR.
923    pub fn append_clr(
924        &self,
925        txn_id: u64,
926        _original_lsn: u64,
927        undo_next_lsn: Option<u64>,
928        undo_data: &[u8],
929    ) -> Result<u64> {
930        // Encode undo_next_lsn into the key field
931        let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
932        let entry = TxnWalEntry {
933            record_type: WalRecordType::CompensationLogRecord,
934            txn_id,
935            timestamp_us: TxnWalEntry::now_us(),
936            key, // undo_next_lsn encoded in key
937            value: undo_data.to_vec(),
938        };
939        self.append(&entry)
940    }
941
942    /// Write checkpoint with data (for fuzzy checkpoints)
943    pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
944        let entry = TxnWalEntry {
945            record_type: WalRecordType::Checkpoint,
946            txn_id: 0,
947            timestamp_us: TxnWalEntry::now_us(),
948            key: Vec::new(),
949            value: checkpoint_data.to_vec(),
950        };
951        self.append_sync(&entry)
952    }
953
954    /// Write checkpoint end with captured state
955    pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
956        let entry = TxnWalEntry {
957            record_type: WalRecordType::CheckpointEnd,
958            txn_id: 0,
959            timestamp_us: TxnWalEntry::now_us(),
960            key: Vec::new(),
961            value: checkpoint_data.to_vec(),
962        };
963        self.append_sync(&entry)
964    }
965
966    /// Get current sequence number
967    pub fn sequence(&self) -> u64 {
968        self.sequence.load(Ordering::SeqCst)
969    }
970
971    /// Get bytes written since last sync
972    pub fn bytes_since_sync(&self) -> u64 {
973        self.bytes_since_sync.load(Ordering::Relaxed)
974    }
975
976    /// Get path to WAL file
977    pub fn path(&self) -> &Path {
978        &self.path
979    }
980}
981
982/// Statistics about WAL state
983#[derive(Debug, Clone, Default)]
984pub struct TxnWalStats {
985    /// Number of entries written
986    pub entries_written: u64,
987    /// Bytes written since last sync
988    pub bytes_since_sync: u64,
989    /// Current transaction ID counter
990    pub next_txn_id: u64,
991}
992
993// ============================================================================
994// Sharded WAL for Reduced Mutex Contention
995// ============================================================================
996
997/// Sharded Write-Ahead Log for high-concurrency workloads
998///
999/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1000/// - Writers hash to shard by txn_id
1001/// - Each shard has its own buffer
1002/// - Central coordinator handles fsync ordering
1003///
1004/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1005#[allow(dead_code)]
1006pub struct ShardedWal {
1007    /// Shard writers (txn_id % num_shards selects shard)
1008    shards: Vec<parking_lot::Mutex<WalShard>>,
1009    /// Number of shards (power of 2)
1010    num_shards: usize,
1011    /// Central WAL file for ordered writes
1012    central_writer: parking_lot::Mutex<BufWriter<File>>,
1013    /// Next transaction ID
1014    next_txn_id: AtomicU64,
1015    /// Write sequence (global ordering)
1016    sequence: AtomicU64,
1017    /// Path
1018    path: PathBuf,
1019}
1020
1021/// Individual WAL shard buffer
1022struct WalShard {
1023    /// Buffered entries for this shard
1024    buffer: Vec<u8>,
1025    /// Number of entries buffered
1026    entry_count: usize,
1027}
1028
1029impl WalShard {
1030    fn new() -> Self {
1031        Self {
1032            buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1033            entry_count: 0,
1034        }
1035    }
1036
1037    fn append(&mut self, entry: &TxnWalEntry) {
1038        let bytes = entry.to_bytes();
1039        self.buffer.extend_from_slice(&bytes);
1040        self.entry_count += 1;
1041    }
1042
1043    fn is_empty(&self) -> bool {
1044        self.buffer.is_empty()
1045    }
1046
1047    fn drain(&mut self) -> Vec<u8> {
1048        self.entry_count = 0;
1049        std::mem::take(&mut self.buffer)
1050    }
1051}
1052
1053impl ShardedWal {
1054    /// Create sharded WAL with specified number of shards
1055    ///
1056    /// Recommended: 4-8 shards for typical server workloads
1057    pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1058        let path = path.as_ref().to_path_buf();
1059
1060        if let Some(parent) = path.parent() {
1061            std::fs::create_dir_all(parent)?;
1062        }
1063
1064        let file = std::fs::OpenOptions::new()
1065            .create(true)
1066            .append(true)
1067            .read(true)
1068            .open(&path)?;
1069
1070        // Round up to power of 2 for fast modulo
1071        let num_shards = num_shards.next_power_of_two();
1072        let shards: Vec<_> = (0..num_shards)
1073            .map(|_| parking_lot::Mutex::new(WalShard::new()))
1074            .collect();
1075
1076        Ok(Self {
1077            shards,
1078            num_shards,
1079            central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1080            next_txn_id: AtomicU64::new(1),
1081            sequence: AtomicU64::new(0),
1082            path,
1083        })
1084    }
1085
1086    /// Get shard index for transaction
1087    #[inline]
1088    fn shard_idx(&self, txn_id: u64) -> usize {
1089        (txn_id as usize) & (self.num_shards - 1)
1090    }
1091
1092    /// Append entry to appropriate shard (lock-free for different txns)
1093    pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1094        let shard_idx = self.shard_idx(entry.txn_id);
1095        let mut shard = self.shards[shard_idx].lock();
1096        shard.append(entry);
1097        self.sequence.fetch_add(1, Ordering::SeqCst)
1098    }
1099
1100    /// Allocate transaction ID
1101    pub fn alloc_txn_id(&self) -> u64 {
1102        self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1103    }
1104
1105    /// Flush all shard buffers to central file
1106    pub fn flush(&self) -> Result<()> {
1107        let mut central = self.central_writer.lock();
1108
1109        // Collect all shard buffers (brief lock per shard)
1110        for shard in &self.shards {
1111            let mut shard_guard = shard.lock();
1112            if !shard_guard.is_empty() {
1113                let data = shard_guard.drain();
1114                central.write_all(&data)?;
1115            }
1116        }
1117
1118        central.flush()?;
1119        Ok(())
1120    }
1121
1122    /// Sync to disk (fsync)
1123    pub fn sync(&self) -> Result<()> {
1124        self.flush()?;
1125        let central = self.central_writer.lock();
1126        central.get_ref().sync_all()?;
1127        Ok(())
1128    }
1129
1130    /// Begin transaction
1131    pub fn begin_transaction(&self) -> Result<u64> {
1132        let txn_id = self.alloc_txn_id();
1133        let entry = TxnWalEntry::txn_begin(txn_id);
1134        self.append(&entry);
1135        Ok(txn_id)
1136    }
1137
1138    /// Write data
1139    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1140        let entry = TxnWalEntry::data(txn_id, key, value);
1141        Ok(self.append(&entry))
1142    }
1143
1144    /// Commit transaction
1145    pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1146        let entry = TxnWalEntry::txn_commit(txn_id);
1147        let seq = self.append(&entry);
1148        self.sync()?; // Fsync on commit for durability
1149        Ok(seq)
1150    }
1151
1152    /// Get statistics
1153    pub fn stats(&self) -> ShardedWalStats {
1154        let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1155        for shard in &self.shards {
1156            shard_entry_counts.push(shard.lock().entry_count);
1157        }
1158
1159        ShardedWalStats {
1160            num_shards: self.num_shards,
1161            total_entries: self.sequence.load(Ordering::SeqCst),
1162            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1163            shard_entry_counts,
1164        }
1165    }
1166}
1167
1168/// Statistics for sharded WAL
1169#[derive(Debug, Clone)]
1170pub struct ShardedWalStats {
1171    pub num_shards: usize,
1172    pub total_entries: u64,
1173    pub next_txn_id: u64,
1174    pub shard_entry_counts: Vec<usize>,
1175}
1176
1177/// Detailed crash recovery statistics
1178#[derive(Debug, Clone, Default)]
1179pub struct CrashRecoveryStats {
1180    /// Total records read from WAL
1181    pub total_records: u64,
1182    /// Number of committed transactions
1183    pub committed_txns: u64,
1184    /// Number of uncommitted (rolled back) transactions
1185    pub rolled_back_txns: u64,
1186    /// Number of explicitly aborted transactions
1187    pub aborted_txns: u64,
1188    /// Number of data writes recovered
1189    pub recovered_writes: u64,
1190    /// Number of torn/corrupted records at end (expected on crash)
1191    pub torn_records: u64,
1192    /// Bytes read from WAL
1193    pub bytes_read: u64,
1194    /// Recovery duration in microseconds
1195    pub recovery_duration_us: u64,
1196    /// Highest transaction ID seen (for restarting counter)
1197    pub max_txn_id: u64,
1198}
1199
1200impl TxnWal {
1201    /// Get WAL statistics
1202    pub fn stats(&self) -> TxnWalStats {
1203        TxnWalStats {
1204            entries_written: self.sequence.load(Ordering::SeqCst),
1205            bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1206            next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1207        }
1208    }
1209
1210    /// Full crash recovery with detailed statistics
1211    ///
1212    /// This method provides ACID recovery guarantees:
1213    /// 1. **Atomicity**: Uncommitted transactions are rolled back
1214    /// 2. **Durability**: All committed transactions are replayed
1215    /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1216    ///
1217    /// Returns (committed_writes, stats) where committed_writes contains
1218    /// all key-value pairs from committed transactions in order.
1219    #[allow(clippy::type_complexity)]
1220    pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1221        let start_time = std::time::Instant::now();
1222        let file = File::open(&self.path)?;
1223        let file_size = file.metadata()?.len();
1224        let mut reader = BufReader::new(file);
1225
1226        let mut stats = CrashRecoveryStats {
1227            bytes_read: file_size,
1228            ..Default::default()
1229        };
1230
1231        let mut committed_txns: HashSet<u64> = HashSet::new();
1232        let mut aborted_txns: HashSet<u64> = HashSet::new();
1233        let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1234            std::collections::HashMap::new();
1235        let mut all_txns: HashSet<u64> = HashSet::new();
1236
1237        // Read all records, stopping at first corruption (torn write)
1238        loop {
1239            match TxnWalEntry::from_reader(&mut reader) {
1240                Ok(entry) => {
1241                    stats.total_records += 1;
1242                    if entry.txn_id > stats.max_txn_id {
1243                        stats.max_txn_id = entry.txn_id;
1244                    }
1245
1246                    match entry.record_type {
1247                        WalRecordType::TxnBegin => {
1248                            pending_writes.insert(entry.txn_id, Vec::new());
1249                            all_txns.insert(entry.txn_id);
1250                        }
1251                        WalRecordType::Data => {
1252                            if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1253                                writes.push((entry.key, entry.value));
1254                            }
1255                        }
1256                        WalRecordType::TxnCommit => {
1257                            committed_txns.insert(entry.txn_id);
1258                        }
1259                        WalRecordType::TxnAbort => {
1260                            pending_writes.remove(&entry.txn_id);
1261                            aborted_txns.insert(entry.txn_id);
1262                        }
1263                        _ => {}
1264                    }
1265                }
1266                Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1267                    // Clean EOF
1268                    break;
1269                }
1270                Err(_) => {
1271                    // Corruption or torn write - stop here
1272                    stats.torn_records += 1;
1273                    break;
1274                }
1275            }
1276        }
1277
1278        // Collect committed writes
1279        let mut result = Vec::new();
1280        for (txn_id, writes) in &pending_writes {
1281            if committed_txns.contains(txn_id) {
1282                stats.committed_txns += 1;
1283                stats.recovered_writes += writes.len() as u64;
1284                result.extend(writes.clone());
1285            }
1286        }
1287
1288        // Count aborted and rolled-back transactions
1289        stats.aborted_txns = aborted_txns.len() as u64;
1290        stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1291
1292        stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1293
1294        Ok((result, stats))
1295    }
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300    use super::*;
1301    use tempfile::tempdir;
1302
1303    #[test]
1304    fn test_wal_entry_roundtrip() {
1305        let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1306        let bytes = entry.to_bytes();
1307
1308        let mut cursor = std::io::Cursor::new(bytes);
1309        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1310
1311        assert_eq!(recovered.record_type, WalRecordType::Data);
1312        assert_eq!(recovered.txn_id, 42);
1313        assert_eq!(recovered.key, b"key");
1314        assert_eq!(recovered.value, b"value");
1315    }
1316
1317    #[test]
1318    fn test_wal_append_and_replay() {
1319        let dir = tempdir().unwrap();
1320        let wal_path = dir.path().join("test.wal");
1321
1322        // Write some entries
1323        {
1324            let wal = TxnWal::new(&wal_path).unwrap();
1325            let txn_id = wal.begin_transaction().unwrap();
1326            wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1327            wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1328            wal.commit_transaction(txn_id).unwrap();
1329        }
1330
1331        // Replay and verify
1332        {
1333            let wal = TxnWal::new(&wal_path).unwrap();
1334            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1335
1336            assert_eq!(txn_count, 1);
1337            assert_eq!(writes.len(), 2);
1338            assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1339            assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1340        }
1341    }
1342
1343    #[test]
1344    fn test_uncommitted_transaction_rollback() {
1345        let dir = tempdir().unwrap();
1346        let wal_path = dir.path().join("test.wal");
1347
1348        // Write committed and uncommitted transactions
1349        {
1350            let wal = TxnWal::new(&wal_path).unwrap();
1351
1352            // Committed transaction
1353            let txn1 = wal.begin_transaction().unwrap();
1354            wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1355                .unwrap();
1356            wal.commit_transaction(txn1).unwrap();
1357
1358            // Uncommitted transaction (simulates crash)
1359            let txn2 = wal.begin_transaction().unwrap();
1360            wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1361                .unwrap();
1362            // No commit!
1363        }
1364
1365        // Replay - uncommitted should be rolled back
1366        {
1367            let wal = TxnWal::new(&wal_path).unwrap();
1368            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1369
1370            assert_eq!(txn_count, 1); // Only committed transaction
1371            assert_eq!(writes.len(), 1);
1372            assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1373        }
1374    }
1375
1376    #[test]
1377    fn test_aborted_transaction() {
1378        let dir = tempdir().unwrap();
1379        let wal_path = dir.path().join("test.wal");
1380
1381        {
1382            let wal = TxnWal::new(&wal_path).unwrap();
1383
1384            let txn = wal.begin_transaction().unwrap();
1385            wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1386                .unwrap();
1387            wal.abort_transaction(txn).unwrap();
1388        }
1389
1390        {
1391            let wal = TxnWal::new(&wal_path).unwrap();
1392            let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1393
1394            assert_eq!(txn_count, 0);
1395            assert!(writes.is_empty());
1396        }
1397    }
1398
1399    #[test]
1400    fn test_checksum_validation() {
1401        let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1402        let mut bytes = entry.to_bytes();
1403
1404        // Corrupt the checksum
1405        let len = bytes.len();
1406        bytes[len - 1] ^= 0xFF;
1407
1408        let mut cursor = std::io::Cursor::new(bytes);
1409        let result = TxnWalEntry::from_reader(&mut cursor);
1410
1411        assert!(result.is_err());
1412    }
1413
1414    #[test]
1415    fn test_crash_recovery_with_stats() {
1416        let dir = tempdir().unwrap();
1417        let wal_path = dir.path().join("test.wal");
1418
1419        // Simulate complex workload
1420        {
1421            let wal = TxnWal::new(&wal_path).unwrap();
1422
1423            // Committed transaction 1
1424            let txn1 = wal.begin_transaction().unwrap();
1425            wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1426            wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1427            wal.commit_transaction(txn1).unwrap();
1428
1429            // Aborted transaction
1430            let txn2 = wal.begin_transaction().unwrap();
1431            wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1432                .unwrap();
1433            wal.abort_transaction(txn2).unwrap();
1434
1435            // Committed transaction 2
1436            let txn3 = wal.begin_transaction().unwrap();
1437            wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1438            wal.commit_transaction(txn3).unwrap();
1439
1440            // Uncommitted transaction (simulates crash)
1441            let txn4 = wal.begin_transaction().unwrap();
1442            wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1443                .unwrap();
1444            // No commit - simulates crash
1445        }
1446
1447        // Recover and verify
1448        {
1449            let wal = TxnWal::new(&wal_path).unwrap();
1450            let (writes, stats) = wal.crash_recovery().unwrap();
1451
1452            // Should have 3 writes from 2 committed transactions
1453            assert_eq!(writes.len(), 3);
1454            assert_eq!(stats.committed_txns, 2);
1455            assert_eq!(stats.aborted_txns, 1);
1456            assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1457            assert_eq!(stats.recovered_writes, 3);
1458            assert!(stats.recovery_duration_us > 0);
1459        }
1460    }
1461
1462    #[test]
1463    fn test_torn_write_detection() {
1464        let dir = tempdir().unwrap();
1465        let wal_path = dir.path().join("test.wal");
1466
1467        // Write a valid transaction
1468        {
1469            let wal = TxnWal::new(&wal_path).unwrap();
1470            let txn = wal.begin_transaction().unwrap();
1471            wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1472            wal.commit_transaction(txn).unwrap();
1473        }
1474
1475        // Append corrupted bytes to simulate torn write
1476        {
1477            use std::io::Write;
1478            let mut file = std::fs::OpenOptions::new()
1479                .append(true)
1480                .open(&wal_path)
1481                .unwrap();
1482            // Write partial record (torn write)
1483            file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1484                .unwrap();
1485        }
1486
1487        // Recovery should still work, detecting torn write
1488        {
1489            let wal = TxnWal::new(&wal_path).unwrap();
1490            let (writes, stats) = wal.crash_recovery().unwrap();
1491
1492            // Should recover the valid transaction
1493            assert_eq!(writes.len(), 1);
1494            assert_eq!(stats.committed_txns, 1);
1495            assert_eq!(stats.torn_records, 1);
1496        }
1497    }
1498
1499    #[test]
1500    fn test_crc32_determinism() {
1501        // Verify CRC32 produces consistent checksums for same content
1502        let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1503        entry1.timestamp_us = 12345; // Fixed timestamp for determinism
1504
1505        let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1506        entry2.timestamp_us = 12345; // Same timestamp
1507
1508        assert_eq!(entry1.checksum(), entry2.checksum());
1509
1510        // Different content should produce different checksum
1511        let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1512        entry3.timestamp_us = 12345;
1513        assert_ne!(entry1.checksum(), entry3.checksum());
1514
1515        // Verify roundtrip preserves checksum
1516        let bytes = entry1.to_bytes();
1517        let mut cursor = std::io::Cursor::new(bytes);
1518        let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1519        assert_eq!(recovered.checksum(), entry1.checksum());
1520    }
1521}