sochdb_storage/txn_wal.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1) │ TxnId(8)│ Timestamp │ KeyLen │ ValueLen │ Key(*) │ Value(*)│ CRC32(4) │
40//! │ │ │ │ (8) │ (4) │ (4) │ │ │ │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use crate::encryption::EncryptionEngine;
52use byteorder::{LittleEndian, ReadBytesExt};
53use parking_lot::Mutex;
54use sochdb_core::{Result, SochDBError, WalRecordType};
55use std::cell::Cell;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::io::{BufReader, BufWriter, Read, Write};
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::time::{Instant, SystemTime, UNIX_EPOCH};
63
64// =============================================================================
65// Coarse-Grained Timestamp Caching (Recommendation 5)
66// =============================================================================
67
68/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
69const CACHE_VALIDITY_NS: u64 = 1_000_000;
70
71thread_local! {
72 /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
73 /// Avoids syscall overhead by caching timestamp for ~1ms windows
74 static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
75}
76
77/// Get cached timestamp in microseconds
78///
79/// This function eliminates per-write `SystemTime::now()` syscalls by:
80/// 1. Caching the wall-clock timestamp in thread-local storage
81/// 2. Using monotonic `Instant` for sub-millisecond offsets
82/// 3. Only refreshing from syscall when cache expires (every ~1ms)
83///
84/// ## Performance Impact
85///
86/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
87/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
88///
89/// ## Monotonicity Guarantee
90///
91/// Timestamps are guaranteed to be monotonically increasing within a thread
92/// due to the `elapsed` offset added to the cached base timestamp.
93#[inline(always)]
94pub fn cached_timestamp_us() -> u64 {
95 TS_CACHE.with(|cache| {
96 let (instant, ts) = cache.get();
97 let elapsed_ns = instant.elapsed().as_nanos() as u64;
98
99 if elapsed_ns < CACHE_VALIDITY_NS {
100 // Fast path: return cached + monotonic offset
101 // This is pure arithmetic, no syscall
102 ts + elapsed_ns / 1000
103 } else {
104 // Slow path: refresh cache from syscall
105 let new_ts = SystemTime::now()
106 .duration_since(UNIX_EPOCH)
107 .expect("system clock set before UNIX epoch (1970-01-01)")
108 .as_micros() as u64;
109 cache.set((Instant::now(), new_ts));
110 new_ts
111 }
112 })
113}
114
115/// Header size in bytes (without key/value data)
116const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
117
118/// Checksum size (CRC32)
119const CHECKSUM_SIZE: usize = 4;
120
121/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
122const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
123
124// =============================================================================
125// At-rest encryption framing (Task 3B)
126// =============================================================================
127//
128// When the WAL's `EncryptionEngine` is enabled, each record is written as an
129// encrypted frame instead of the plaintext frame above:
130//
131// ```text
132// plaintext frame: [content_len:u32][type|txn|ts|klen|vlen|key|val|crc32]
133// encrypted frame: [outer_len:u32 ][ver|nonce(12)|ciphertext+tag(16)]
134// ```
135//
136// `outer_len` is the length of the AEAD envelope (ciphertext.len()); the inner
137// plaintext that the envelope protects is the record BODY — exactly the bytes
138// after `content_len` in the plaintext frame (`type..crc32`). On decrypt the
139// body is parsed by [`TxnWalEntry::parse_body`], reusing the same field layout +
140// CRC check as the plaintext path.
141//
142// The per-record AAD binds {format_version, db_uuid, dek_epoch, file-relative
143// record ordinal} so a record cannot be reordered, duplicated, spliced from
144// another DB, or read under a downgraded format without failing authentication.
145// The reader reconstructs the identical AAD from its own trusted state (the
146// keyring's db_uuid/epoch and a 0-based counter of records read from this file),
147// NEVER from the attacker-controllable on-disk bytes.
148
149/// Encode a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) — the bytes the
150/// AEAD envelope protects, identical to `to_bytes()` minus the length prefix.
151/// Used by the encrypted write paths to materialize a record before sealing it.
152fn encode_record_body(
153 record_type: WalRecordType,
154 txn_id: u64,
155 timestamp_us: u64,
156 key: &[u8],
157 value: &[u8],
158) -> Vec<u8> {
159 let body_len = (RECORD_HEADER_SIZE - 4) + key.len() + value.len() + CHECKSUM_SIZE;
160 let mut body = Vec::with_capacity(body_len);
161 let mut hasher = crc32fast::Hasher::new();
162 let rt = record_type as u8;
163 body.push(rt);
164 hasher.update(&[rt]);
165 let t = txn_id.to_le_bytes();
166 body.extend_from_slice(&t);
167 hasher.update(&t);
168 let ts = timestamp_us.to_le_bytes();
169 body.extend_from_slice(&ts);
170 hasher.update(&ts);
171 let kl = (key.len() as u32).to_le_bytes();
172 body.extend_from_slice(&kl);
173 hasher.update(&kl);
174 let vl = (value.len() as u32).to_le_bytes();
175 body.extend_from_slice(&vl);
176 hasher.update(&vl);
177 body.extend_from_slice(key);
178 hasher.update(key);
179 body.extend_from_slice(value);
180 hasher.update(value);
181 body.extend_from_slice(&hasher.finalize().to_le_bytes());
182 body
183}
184
185/// A Point-in-Time Recovery target (Task 3B PITR). See
186/// [`TxnWal::replay_to_target`] for the prefix semantics.
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
188pub enum RecoveryTarget {
189 /// Recover the first `lsn` WAL records (exact). Matches the value of
190 /// `DurableStorage::current_lsn()` captured at the desired point.
191 Lsn(u64),
192 /// Roll the WAL forward and STOP at the first transaction whose COMMIT
193 /// timestamp (microseconds since the epoch) exceeds this value — that
194 /// transaction and everything after it are excluded. This is an exact
195 /// WAL-order PREFIX, NOT a timestamp filter: if commit timestamps are
196 /// non-monotonic in WAL order (coarse ~1ms clock, NTP steps, cross-thread
197 /// group commit), a later-in-WAL transaction with `ts <= t` that follows an
198 /// excluded commit is ALSO excluded. Prefer `Lsn` for an exact, clock-
199 /// independent cut.
200 Timestamp(u64),
201}
202
203/// AAD layout version for the WAL record binding. Part of the on-disk contract.
204const WAL_AAD_VERSION: u8 = 1;
205/// AAD length: version(1) + db_uuid(16) + dek_epoch(4) + record_ordinal(8).
206const WAL_AAD_LEN: usize = 1 + 16 + 4 + 8;
207/// Upper bound on a single WAL frame's on-disk length, to reject a corrupted
208/// length prefix before it triggers a multi-GB `read_exact` allocation/DoS.
209const MAX_WAL_FRAME_LEN: u32 = 512 * 1024 * 1024;
210
211// =============================================================================
212// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
213// =============================================================================
214
215/// Transaction-local WAL write buffer
216///
217/// Collects all writes during a transaction in memory, then flushes
218/// everything with a SINGLE lock acquisition at commit time.
219///
220/// ## Performance Impact
221///
222/// ```text
223/// Without TxnWalBuffer (current):
224/// 1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
225/// Lock overhead: 1000 × 20ns = 20µs per transaction
226///
227/// With TxnWalBuffer:
228/// 1000 writes buffered locally (NO LOCK)
229/// 1 flush at commit (SINGLE LOCK)
230/// Lock overhead: 1 × 20ns = 20ns per transaction
231/// Speedup: 1000× for lock overhead
232/// ```
233///
234/// ## Usage
235///
236/// ```ignore
237/// let mut buffer = TxnWalBuffer::new(txn_id);
238///
239/// // These do NOT acquire any locks
240/// buffer.append(b"key1", b"value1");
241/// buffer.append(b"key2", b"value2");
242/// // ... hundreds of writes ...
243///
244/// // Single lock acquisition, single write syscall
245/// buffer.flush(&wal)?;
246/// ```
247#[derive(Debug)]
248pub struct TxnWalBuffer {
249 /// Transaction ID for all entries
250 txn_id: u64,
251 /// Accumulated serialized entries
252 buffer: Vec<u8>,
253 /// Number of entries buffered
254 entry_count: usize,
255}
256
257impl TxnWalBuffer {
258 /// Create a new buffer for a transaction
259 #[inline]
260 pub fn new(txn_id: u64) -> Self {
261 Self {
262 txn_id,
263 buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
264 entry_count: 0,
265 }
266 }
267
268 /// Create with specific capacity
269 #[inline]
270 pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
271 Self {
272 txn_id,
273 buffer: Vec::with_capacity(capacity),
274 entry_count: 0,
275 }
276 }
277
278 /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
279 ///
280 /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
281 /// This is completely lock-free and does not touch the file system.
282 ///
283 /// Uses cached timestamps to eliminate per-write syscall overhead.
284 #[inline]
285 pub fn append(&mut self, key: &[u8], value: &[u8]) {
286 // Use cached timestamp instead of syscall (Recommendation 5)
287 let timestamp_us = cached_timestamp_us();
288
289 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
290 let entry_start = self.buffer.len();
291
292 // Reserve space for length prefix (will fill at end)
293 self.buffer.extend_from_slice(&[0u8; 4]);
294
295 let mut hasher = crc32fast::Hasher::new();
296
297 // Record type (Data = 0)
298 let record_type_byte = WalRecordType::Data as u8;
299 self.buffer.push(record_type_byte);
300 hasher.update(&[record_type_byte]);
301
302 // Transaction ID
303 let txn_bytes = self.txn_id.to_le_bytes();
304 self.buffer.extend_from_slice(&txn_bytes);
305 hasher.update(&txn_bytes);
306
307 // Timestamp
308 let ts_bytes = timestamp_us.to_le_bytes();
309 self.buffer.extend_from_slice(&ts_bytes);
310 hasher.update(&ts_bytes);
311
312 // Key length
313 let key_len_bytes = (key.len() as u32).to_le_bytes();
314 self.buffer.extend_from_slice(&key_len_bytes);
315 hasher.update(&key_len_bytes);
316
317 // Value length
318 let val_len_bytes = (value.len() as u32).to_le_bytes();
319 self.buffer.extend_from_slice(&val_len_bytes);
320 hasher.update(&val_len_bytes);
321
322 // Key data
323 self.buffer.extend_from_slice(key);
324 hasher.update(key);
325
326 // Value data
327 self.buffer.extend_from_slice(value);
328 hasher.update(value);
329
330 // CRC32 checksum
331 self.buffer
332 .extend_from_slice(&hasher.finalize().to_le_bytes());
333
334 // Fill in length prefix (content length, not including the 4-byte length field)
335 let content_len = (total_len - 4) as u32;
336 self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
337
338 self.entry_count += 1;
339 }
340
341 /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
342 ///
343 /// Acquires the WAL lock once and writes all accumulated entries.
344 /// Returns the first sequence number assigned to buffered entries.
345 ///
346 /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
347 /// as it properly handles the private writer field.
348 #[inline]
349 pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
350 wal.flush_buffer(self)
351 }
352
353 /// Clear the buffer (for reuse)
354 #[inline]
355 pub fn clear(&mut self) {
356 self.buffer.clear();
357 self.entry_count = 0;
358 }
359
360 /// Get number of buffered entries
361 #[inline]
362 pub fn entry_count(&self) -> usize {
363 self.entry_count
364 }
365
366 /// Get total bytes buffered
367 #[inline]
368 pub fn bytes_buffered(&self) -> usize {
369 self.buffer.len()
370 }
371
372 /// Check if buffer is empty
373 #[inline]
374 pub fn is_empty(&self) -> bool {
375 self.buffer.is_empty()
376 }
377}
378
379/// WAL entry for transaction-aware operations
380#[derive(Debug, Clone)]
381pub struct TxnWalEntry {
382 /// Record type
383 pub record_type: WalRecordType,
384 /// Transaction ID
385 pub txn_id: u64,
386 /// Timestamp in microseconds
387 pub timestamp_us: u64,
388 /// Key data
389 pub key: Vec<u8>,
390 /// Value data
391 pub value: Vec<u8>,
392}
393
394impl TxnWalEntry {
395 /// Create a new data entry
396 pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
397 Self {
398 record_type: WalRecordType::Data,
399 txn_id,
400 timestamp_us: Self::now_us(),
401 key,
402 value,
403 }
404 }
405
406 /// Create a transaction begin entry
407 pub fn txn_begin(txn_id: u64) -> Self {
408 Self {
409 record_type: WalRecordType::TxnBegin,
410 txn_id,
411 timestamp_us: Self::now_us(),
412 key: Vec::new(),
413 value: Vec::new(),
414 }
415 }
416
417 /// Create a transaction commit entry
418 pub fn txn_commit(txn_id: u64) -> Self {
419 Self {
420 record_type: WalRecordType::TxnCommit,
421 txn_id,
422 timestamp_us: Self::now_us(),
423 key: Vec::new(),
424 value: Vec::new(),
425 }
426 }
427
428 /// Create a transaction abort entry
429 pub fn txn_abort(txn_id: u64) -> Self {
430 Self {
431 record_type: WalRecordType::TxnAbort,
432 txn_id,
433 timestamp_us: Self::now_us(),
434 key: Vec::new(),
435 value: Vec::new(),
436 }
437 }
438
439 /// Create a checkpoint entry
440 pub fn checkpoint(txn_id: u64) -> Self {
441 Self {
442 record_type: WalRecordType::Checkpoint,
443 txn_id,
444 timestamp_us: Self::now_us(),
445 key: Vec::new(),
446 value: Vec::new(),
447 }
448 }
449
450 /// Create a schema change entry
451 pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
452 Self {
453 record_type: WalRecordType::SchemaChange,
454 txn_id,
455 timestamp_us: Self::now_us(),
456 key: Vec::new(),
457 value: schema_data,
458 }
459 }
460
461 /// Get current time in microseconds (uses cached timestamp)
462 #[inline]
463 fn now_us() -> u64 {
464 cached_timestamp_us()
465 }
466
467 /// Calculate CRC32 checksum for this entry
468 ///
469 /// Uses crc32fast for portable, deterministic checksums.
470 /// The checksum covers all fields except the checksum itself.
471 /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
472 pub fn checksum(&self) -> u32 {
473 let mut hasher = crc32fast::Hasher::new();
474 hasher.update(&[self.record_type as u8]);
475 hasher.update(&self.txn_id.to_le_bytes());
476 hasher.update(&self.timestamp_us.to_le_bytes());
477 hasher.update(&(self.key.len() as u32).to_le_bytes());
478 hasher.update(&(self.value.len() as u32).to_le_bytes());
479 hasher.update(&self.key);
480 hasher.update(&self.value);
481 hasher.finalize()
482 }
483
484 /// Serialize to bytes with single-pass CRC calculation
485 ///
486 /// Optimized to calculate CRC while building the buffer,
487 /// avoiding a second pass over all data.
488 pub fn to_bytes(&self) -> Vec<u8> {
489 let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
490 let mut buf = Vec::with_capacity(total_len);
491 let mut hasher = crc32fast::Hasher::new();
492
493 // Length (not including the length field itself) - not included in CRC
494 let content_len = (total_len - 4) as u32;
495 buf.extend_from_slice(&content_len.to_le_bytes());
496
497 // Record type
498 let record_type_byte = self.record_type as u8;
499 buf.push(record_type_byte);
500 hasher.update(&[record_type_byte]);
501
502 // Transaction ID
503 let txn_bytes = self.txn_id.to_le_bytes();
504 buf.extend_from_slice(&txn_bytes);
505 hasher.update(&txn_bytes);
506
507 // Timestamp
508 let ts_bytes = self.timestamp_us.to_le_bytes();
509 buf.extend_from_slice(&ts_bytes);
510 hasher.update(&ts_bytes);
511
512 // Key length
513 let key_len_bytes = (self.key.len() as u32).to_le_bytes();
514 buf.extend_from_slice(&key_len_bytes);
515 hasher.update(&key_len_bytes);
516
517 // Value length
518 let val_len_bytes = (self.value.len() as u32).to_le_bytes();
519 buf.extend_from_slice(&val_len_bytes);
520 hasher.update(&val_len_bytes);
521
522 // Key data
523 buf.extend_from_slice(&self.key);
524 hasher.update(&self.key);
525
526 // Value data
527 buf.extend_from_slice(&self.value);
528 hasher.update(&self.value);
529
530 // CRC32 Checksum (computed in single pass above)
531 buf.extend_from_slice(&hasher.finalize().to_le_bytes());
532
533 buf
534 }
535
536 /// Deserialize a PLAINTEXT WAL frame from a reader, with torn-write detection.
537 ///
538 /// This is the legacy / unencrypted reader. The live replay path on an
539 /// encrypted WAL does NOT use this — it uses [`TxnWal::read_record`], which
540 /// decrypts first and then calls [`Self::parse_body`]. Keeping this method
541 /// plaintext-only ensures a stray caller can never silently mis-parse
542 /// ciphertext as a plaintext frame.
543 ///
544 /// Returns error if:
545 /// - Length field indicates data is too short or implausibly large
546 /// - CRC32 checksum mismatch (corruption or torn write)
547 /// - Invalid record type
548 pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
549 // Length (allows torn write detection - if length claims more data than exists)
550 let content_len = reader.read_u32::<LittleEndian>()?;
551 if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
552 return Err(SochDBError::Corruption("WAL entry too short".into()));
553 }
554 if content_len > MAX_WAL_FRAME_LEN {
555 return Err(SochDBError::Corruption(format!(
556 "WAL entry length {content_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
557 )));
558 }
559
560 // Read the whole body, then parse. A short read here (torn tail) surfaces
561 // as Io(UnexpectedEof), which replay treats as a clean end-of-WAL.
562 let mut body = vec![0u8; content_len as usize];
563 reader.read_exact(&mut body)?;
564 Self::parse_body(&body)
565 }
566
567 /// Parse a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) and verify its
568 /// CRC. This is the bytes after the `content_len` prefix in a plaintext frame,
569 /// and is exactly what the AEAD envelope protects in an encrypted frame — so
570 /// both the plaintext and decrypt paths share this single parser.
571 pub fn parse_body(body: &[u8]) -> Result<Self> {
572 let mut cur = std::io::Cursor::new(body);
573
574 let record_type_byte = cur.read_u8()?;
575 let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
576 SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
577 })?;
578
579 let txn_id = cur.read_u64::<LittleEndian>()?;
580 let timestamp_us = cur.read_u64::<LittleEndian>()?;
581 let key_len = cur.read_u32::<LittleEndian>()? as usize;
582 let value_len = cur.read_u32::<LittleEndian>()? as usize;
583
584 let mut key = vec![0u8; key_len];
585 cur.read_exact(&mut key)?;
586 let mut value = vec![0u8; value_len];
587 cur.read_exact(&mut value)?;
588
589 let stored_checksum = cur.read_u32::<LittleEndian>()?;
590
591 let entry = Self {
592 record_type,
593 txn_id,
594 timestamp_us,
595 key,
596 value,
597 };
598
599 // Verify checksum - detects both corruption and torn writes
600 if entry.checksum() != stored_checksum {
601 return Err(SochDBError::Corruption(format!(
602 "WAL checksum mismatch for txn_id {}: expected {}, got {}",
603 txn_id,
604 entry.checksum(),
605 stored_checksum
606 )));
607 }
608
609 Ok(entry)
610 }
611
612 /// The record BODY bytes (`type..crc32`) — i.e. `to_bytes()` without the
613 /// leading 4-byte content_len. This is what gets fed to the AEAD on the
614 /// encrypted write path.
615 fn body_bytes(&self) -> Vec<u8> {
616 let full = self.to_bytes();
617 full[4..].to_vec()
618 }
619}
620
621/// Transaction-aware Write-Ahead Log
622pub struct TxnWal {
623 /// Path to WAL file
624 path: PathBuf,
625 /// Buffered writer
626 writer: Mutex<BufWriter<File>>,
627 /// Next transaction ID
628 next_txn_id: AtomicU64,
629 /// Write sequence number
630 sequence: AtomicU64,
631 /// Bytes written since last sync
632 bytes_since_sync: AtomicU64,
633 /// Cached timestamp (microseconds since epoch)
634 /// Updated periodically to avoid syscall per write
635 cached_timestamp_us: AtomicU64,
636 /// At-rest encryption engine. `disabled()` (identity passthrough) for a
637 /// plaintext WAL — in which case every write/read path is byte-identical to
638 /// the pre-3B format. When enabled, records are written/read as encrypted
639 /// frames (see the framing notes above).
640 encryption: Arc<EncryptionEngine>,
641 /// 16-byte database identity bound into each record's AAD (cross-DB splice
642 /// defense). All-zero / unused when encryption is disabled.
643 db_uuid: [u8; 16],
644 /// Active DEK epoch bound into each record's AAD (downgrade-across-rotation
645 /// defense). 0 / unused when encryption is disabled.
646 dek_epoch: u32,
647 /// File-relative count of records written to the CURRENT WAL file (reset on
648 /// truncate). Used as the per-record AAD ordinal so reorder/duplication
649 /// within a file fails authentication. All writes hold the `writer` lock, so
650 /// this counter is only mutated under that lock; it is read locklessly by the
651 /// (single-threaded) replay paths.
652 records_in_file: AtomicU64,
653}
654
655impl TxnWal {
656 /// Create a new plaintext WAL or open an existing one.
657 ///
658 /// Uses a disabled (passthrough) encryption engine, so the on-disk format is
659 /// byte-identical to the pre-encryption WAL. This is the constructor used by
660 /// every non-live caller (tests, tools); the live durable-storage path uses
661 /// [`Self::new_with_encryption`] to thread the keyring's engine in.
662 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
663 Self::new_with_encryption(path, Arc::new(EncryptionEngine::disabled()), [0u8; 16], 0)
664 }
665
666 /// Create/open a WAL with an explicit at-rest encryption engine.
667 ///
668 /// When `engine` is enabled, records are written and replayed as encrypted
669 /// frames bound to `db_uuid` + `dek_epoch` (see framing notes). When it is
670 /// disabled, this behaves exactly like [`Self::new`].
671 ///
672 /// `pub(crate)` by design: the engine + `db_uuid` + `dek_epoch` MUST come
673 /// from [`crate::keyring::load_or_init`] so the fail-closed open contract
674 /// (MAC + canary) and the AAD binding are enforced. The only public door to
675 /// an encrypted WAL is
676 /// [`crate::durable_storage::DurableStorage::open_with_encryption`].
677 pub(crate) fn new_with_encryption<P: AsRef<Path>>(
678 path: P,
679 encryption: Arc<EncryptionEngine>,
680 db_uuid: [u8; 16],
681 dek_epoch: u32,
682 ) -> Result<Self> {
683 let path = path.as_ref().to_path_buf();
684
685 // Ensure parent directory exists
686 if let Some(parent) = path.parent() {
687 std::fs::create_dir_all(parent)?;
688 }
689
690 let file = OpenOptions::new()
691 .create(true)
692 .append(true)
693 .read(true)
694 .open(&path)?;
695
696 // Use 256KB buffer for better batch performance (default is 8KB)
697 // This reduces system calls when buffering many small writes
698 let now_us = cached_timestamp_us();
699
700 let wal = Self {
701 path,
702 writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
703 next_txn_id: AtomicU64::new(1),
704 sequence: AtomicU64::new(0),
705 bytes_since_sync: AtomicU64::new(0),
706 cached_timestamp_us: AtomicU64::new(now_us),
707 encryption,
708 db_uuid,
709 dek_epoch,
710 records_in_file: AtomicU64::new(0),
711 };
712
713 // Recover state from existing WAL
714 wal.recover_state()?;
715
716 Ok(wal)
717 }
718
719 /// Build the per-record AAD for a given file-relative ordinal. The reader
720 /// reconstructs this from its own trusted state, never from on-disk bytes.
721 #[inline]
722 fn record_aad(&self, ordinal: u64) -> [u8; WAL_AAD_LEN] {
723 let mut aad = [0u8; WAL_AAD_LEN];
724 aad[0] = WAL_AAD_VERSION;
725 aad[1..17].copy_from_slice(&self.db_uuid);
726 aad[17..21].copy_from_slice(&self.dek_epoch.to_le_bytes());
727 aad[21..29].copy_from_slice(&ordinal.to_le_bytes());
728 aad
729 }
730
731 /// Frame a single record body for the encrypted write path:
732 /// `[outer_len:u32][version|nonce|ciphertext+tag]`, AAD-bound to `ordinal`.
733 #[inline]
734 fn encrypt_frame(&self, body: &[u8], ordinal: u64) -> Result<Vec<u8>> {
735 let env = self
736 .encryption
737 .encrypt_with_aad(body, &self.record_aad(ordinal))?;
738 let mut out = Vec::with_capacity(4 + env.len());
739 out.extend_from_slice(&(env.len() as u32).to_le_bytes());
740 out.extend_from_slice(&env);
741 Ok(out)
742 }
743
744 /// Read the next record from a replay reader, decrypting if the WAL is
745 /// encrypted. Returns the record and advances `*ordinal`.
746 ///
747 /// Error taxonomy (the linchpin of fail-loud recovery):
748 /// - `Io(UnexpectedEof)` reading the length prefix or a short body read at
749 /// the physical tail ⇒ clean torn-tail / end-of-WAL (callers tolerate).
750 /// - `Encryption(_)` (AEAD auth failure / wrong key / bad envelope) or
751 /// `Corruption(_)` (CRC / bad framing) ⇒ HARD error; callers MUST abort
752 /// recovery rather than treat it as EOF.
753 fn read_record<R: Read>(&self, reader: &mut R, ordinal: &mut u64) -> Result<TxnWalEntry> {
754 if !self.encryption.is_enabled() {
755 let entry = TxnWalEntry::from_reader(reader)?;
756 *ordinal += 1;
757 return Ok(entry);
758 }
759 // Encrypted frame: [outer_len][envelope]
760 let outer_len = reader.read_u32::<LittleEndian>()?; // EOF here = clean end
761 if outer_len > MAX_WAL_FRAME_LEN {
762 return Err(SochDBError::Corruption(format!(
763 "encrypted WAL frame length {outer_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
764 )));
765 }
766 let mut env = vec![0u8; outer_len as usize];
767 reader.read_exact(&mut env)?; // short read = torn tail (UnexpectedEof)
768 // Decrypt with the AAD reconstructed for THIS ordinal. A wrong key, a
769 // reordered/spliced record, or tampering fails here as Encryption(_),
770 // which is a hard error — never a silent EOF.
771 let body = self
772 .encryption
773 .decrypt_with_aad(&env, &self.record_aad(*ordinal))?;
774 let entry = TxnWalEntry::parse_body(&body)?;
775 *ordinal += 1;
776 Ok(entry)
777 }
778
779 /// Recover state (next txn ID, sequence) from existing WAL
780 ///
781 /// To avoid txn_id collisions when multiple processes open the same
782 /// WAL concurrently, we incorporate the PID into the starting txn_id.
783 /// Format: upper 32 bits = PID, lower 32 bits = counter.
784 /// This guarantees uniqueness across processes without coordination.
785 fn recover_state(&self) -> Result<()> {
786 let file = File::open(&self.path)?;
787 let mut reader = BufReader::new(file);
788 let mut count: u64 = 0;
789
790 // Track the max counter (lower 32 bits) for OUR PID only.
791 // Each process owns its own txn_id space: upper 32 bits = PID.
792 // We must NOT use max_txn_id across ALL PIDs, because that would
793 // place us into another PID's ID space and cause collisions.
794 let our_pid = std::process::id() as u64;
795 let pid_base = our_pid << 32;
796 let mut max_our_counter: u64 = 0;
797 let mut ordinal: u64 = 0;
798
799 loop {
800 match self.read_record(&mut reader, &mut ordinal) {
801 Ok(entry) => {
802 count += 1;
803 // Only track counters from entries that belong to our PID
804 let entry_pid = entry.txn_id >> 32;
805 if entry_pid == our_pid {
806 let entry_counter = entry.txn_id & 0xFFFF_FFFF;
807 if entry_counter > max_our_counter {
808 max_our_counter = entry_counter;
809 }
810 }
811 }
812 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
813 break;
814 }
815 Err(e) => {
816 // Encrypted: a non-EOF error (AEAD auth failure / tamper /
817 // corruption) is NEVER a torn tail — fail loud rather than
818 // silently truncate. Plaintext: tolerate trailing corruption
819 // as an incomplete final write (legacy behavior).
820 if self.encryption.is_enabled() {
821 return Err(e);
822 }
823 break;
824 }
825 }
826 }
827
828 // Start from pid_base + (max counter we've seen for our PID + 1).
829 // This ensures:
830 // 1. Unique across processes (different PID → different upper 32 bits)
831 // 2. Unique within this process even if PID is recycled (we skip
832 // over any counters already used by a previous process with our PID)
833 let next_id = pid_base + max_our_counter + 1;
834
835 self.next_txn_id.store(next_id, Ordering::SeqCst);
836 self.sequence.store(count, Ordering::SeqCst);
837 // The current file already holds `count` records, so the next encrypted
838 // write must continue the AAD ordinal sequence from there.
839 self.records_in_file.store(count, Ordering::SeqCst);
840
841 Ok(())
842 }
843
844 /// Get cached timestamp, updating if stale (>1ms old)
845 ///
846 /// This avoids a syscall per write by caching the timestamp.
847 /// For WAL purposes, ~1ms granularity is sufficient.
848 #[inline]
849 fn get_cached_timestamp(&self) -> u64 {
850 // Fast path: use cached value (no syscall)
851 let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
852
853 // Refresh occasionally (every ~1000 writes or when sequence wraps)
854 // This is a very cheap check since sequence is incremented atomically anyway
855 let seq = self.sequence.load(Ordering::Relaxed);
856 if seq & 0x3FF == 0 {
857 // Refresh every 1024 writes
858 let now_us = cached_timestamp_us();
859 self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
860 return now_us;
861 }
862
863 cached
864 }
865
866 /// Append an entry to the WAL
867 ///
868 /// Returns the sequence number of this write.
869 ///
870 /// # Durability Contract
871 ///
872 /// This method writes data to the BufWriter but does **NOT** fsync.
873 /// The data is NOT durable until `flush()` + `sync()` are called.
874 ///
875 /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
876 /// which enforce fsync. This method is intentionally non-durable for
877 /// batching data writes within a transaction (pre-commit records).
878 ///
879 /// The group commit path (`EventDrivenGroupCommit`) is responsible for
880 /// calling `flush()` + `sync()` after batching multiple commit records.
881 pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
882 let mut writer = self.writer.lock();
883 let bytes = self.frame_for_write(entry)?;
884
885 writer.write_all(&bytes)?;
886 // Don't flush here - BufWriter will batch writes automatically.
887 // Call flush() explicitly before sync() or commit().
888 self.commit_record_ordinal();
889
890 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
891 self.bytes_since_sync
892 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
893
894 Ok(seq)
895 }
896
897 /// Serialize one entry into its on-disk frame using the CURRENT (not-yet-
898 /// advanced) AAD ordinal. MUST be called with the `writer` lock held, and the
899 /// caller MUST call [`Self::commit_record_ordinal`] only AFTER `write_all`
900 /// succeeds — so a failed or partial write never advances the in-memory
901 /// ordinal past what is actually on disk (which would desync every
902 /// subsequent record's AAD from the reader's reconstructed ordinal).
903 #[inline]
904 fn frame_for_write(&self, entry: &TxnWalEntry) -> Result<Vec<u8>> {
905 if self.encryption.is_enabled() {
906 let ord = self.records_in_file.load(Ordering::SeqCst);
907 self.encrypt_frame(&entry.body_bytes(), ord)
908 } else {
909 Ok(entry.to_bytes())
910 }
911 }
912
913 /// Advance the file-relative AAD ordinal by one record. Call ONLY after the
914 /// record's bytes have been handed to the writer (post-`write_all`).
915 #[inline]
916 fn commit_record_ordinal(&self) {
917 if self.encryption.is_enabled() {
918 self.records_in_file.fetch_add(1, Ordering::SeqCst);
919 }
920 }
921
922 /// Append entry without flushing (for batched writes)
923 ///
924 /// Caller must call flush() or sync() afterward to ensure durability.
925 #[inline]
926 pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
927 let mut writer = self.writer.lock();
928 let bytes = self.frame_for_write(entry)?;
929
930 writer.write_all(&bytes)?;
931 // No flush - let BufWriter buffer the writes
932 self.commit_record_ordinal();
933
934 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
935 self.bytes_since_sync
936 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
937
938 Ok(seq)
939 }
940
941 /// Write data without flushing (for batched writes within a transaction)
942 #[inline]
943 pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
944 let entry = TxnWalEntry::data(txn_id, key, value);
945 self.append_no_flush(&entry)
946 }
947
948 /// Write data from slices without any allocation
949 ///
950 /// This is the fastest path for writing - no intermediate Vec allocations.
951 /// Calculates CRC32 while serializing directly to BufWriter.
952 #[inline]
953 pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
954 // Use coarse timestamp (cached every ~1ms) instead of syscall per write
955 let timestamp_us = self.get_cached_timestamp();
956
957 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
958
959 let mut writer = self.writer.lock();
960
961 // Encrypted mode cannot stream field-by-field (the AEAD needs the whole
962 // body to compute its synthetic IV + tag), so materialize the body into a
963 // scratch buffer, seal it, and write one framed envelope. The plaintext
964 // path below keeps its zero-alloc streaming fast path untouched.
965 if self.encryption.is_enabled() {
966 let body = encode_record_body(WalRecordType::Data, txn_id, timestamp_us, key, value);
967 let ord = self.records_in_file.load(Ordering::SeqCst);
968 let frame = self.encrypt_frame(&body, ord)?;
969 writer.write_all(&frame)?;
970 // Advance the AAD ordinal only after the bytes are committed to the
971 // writer, so a failed write never desyncs writer/reader ordinals.
972 self.records_in_file.fetch_add(1, Ordering::SeqCst);
973 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
974 self.bytes_since_sync
975 .fetch_add(frame.len() as u64, Ordering::Relaxed);
976 return Ok(seq);
977 }
978
979 let mut hasher = crc32fast::Hasher::new();
980
981 // Length (not included in CRC)
982 let content_len = (total_len - 4) as u32;
983 writer.write_all(&content_len.to_le_bytes())?;
984
985 // Record type
986 let record_type_byte = WalRecordType::Data as u8;
987 writer.write_all(&[record_type_byte])?;
988 hasher.update(&[record_type_byte]);
989
990 // Transaction ID
991 let txn_bytes = txn_id.to_le_bytes();
992 writer.write_all(&txn_bytes)?;
993 hasher.update(&txn_bytes);
994
995 // Timestamp
996 let ts_bytes = timestamp_us.to_le_bytes();
997 writer.write_all(&ts_bytes)?;
998 hasher.update(&ts_bytes);
999
1000 // Key length
1001 let key_len_bytes = (key.len() as u32).to_le_bytes();
1002 writer.write_all(&key_len_bytes)?;
1003 hasher.update(&key_len_bytes);
1004
1005 // Value length
1006 let val_len_bytes = (value.len() as u32).to_le_bytes();
1007 writer.write_all(&val_len_bytes)?;
1008 hasher.update(&val_len_bytes);
1009
1010 // Key data
1011 writer.write_all(key)?;
1012 hasher.update(key);
1013
1014 // Value data
1015 writer.write_all(value)?;
1016 hasher.update(value);
1017
1018 // CRC32 checksum
1019 writer.write_all(&hasher.finalize().to_le_bytes())?;
1020
1021 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
1022 self.bytes_since_sync
1023 .fetch_add(total_len as u64, Ordering::Relaxed);
1024
1025 Ok(seq)
1026 }
1027
1028 /// Flush pending writes to kernel buffer (but not to disk)
1029 pub fn flush(&self) -> Result<()> {
1030 let mut writer = self.writer.lock();
1031 writer.flush()?;
1032 Ok(())
1033 }
1034
1035 /// Append and immediately sync for durability
1036 pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
1037 let seq = self.append(entry)?;
1038 self.sync()?;
1039 Ok(seq)
1040 }
1041
1042 /// Force sync to disk.
1043 ///
1044 /// Must flush the BufWriter BEFORE fsync: `get_ref()` reaches the raw File
1045 /// and bypasses the 256 KB buffer, so without an explicit `flush()` the
1046 /// just-appended commit/checkpoint record may still sit in userspace and
1047 /// `sync_all()` would fsync stale bytes — silently breaking the durability
1048 /// guarantee `append_sync`/`commit`/`checkpoint` advertise.
1049 pub fn sync(&self) -> Result<()> {
1050 let mut writer = self.writer.lock();
1051 writer.flush()?;
1052 writer.get_ref().sync_all()?;
1053 self.bytes_since_sync.store(0, Ordering::Relaxed);
1054 Ok(())
1055 }
1056
1057 /// Flush a TxnWalBuffer with single lock acquisition
1058 ///
1059 /// This is the high-performance path for transaction commit:
1060 /// - All writes during the transaction are buffered locally
1061 /// - At commit, this method flushes everything with ONE lock
1062 ///
1063 /// ## Performance
1064 ///
1065 /// ```text
1066 /// 1000 writes with individual flush: 1000 × lock overhead
1067 /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
1068 /// Speedup: ~1000× for lock overhead
1069 /// ```
1070 #[inline]
1071 pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
1072 if buffer.is_empty() {
1073 return Ok(0);
1074 }
1075
1076 let mut writer = self.writer.lock();
1077
1078 if self.encryption.is_enabled() {
1079 // The buffer holds concatenated plaintext frames
1080 // `[content_len][body]...`. Re-frame each as an AEAD envelope bound to
1081 // its own file-relative ordinal, preserving per-record framing (so a
1082 // torn tail discards exactly one record).
1083 let buf = &buffer.buffer;
1084 let mut pos = 0usize;
1085 let mut total_written = 0u64;
1086 while pos + 4 <= buf.len() {
1087 let content_len =
1088 u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
1089 as usize;
1090 pos += 4;
1091 if pos + content_len > buf.len() {
1092 return Err(SochDBError::Corruption(
1093 "txn buffer truncated mid-record during encrypted flush".into(),
1094 ));
1095 }
1096 let body = &buf[pos..pos + content_len];
1097 pos += content_len;
1098 let ord = self.records_in_file.load(Ordering::SeqCst);
1099 let frame = self.encrypt_frame(body, ord)?;
1100 writer.write_all(&frame)?;
1101 // Advance per record only after its bytes are committed, so a
1102 // mid-batch write failure leaves records_in_file == records
1103 // actually written (no ordinal desync for the survivors).
1104 self.records_in_file.fetch_add(1, Ordering::SeqCst);
1105 total_written += frame.len() as u64;
1106 }
1107 let seq = self
1108 .sequence
1109 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1110 self.bytes_since_sync
1111 .fetch_add(total_written, Ordering::Relaxed);
1112 return Ok(seq);
1113 }
1114
1115 writer.write_all(&buffer.buffer)?;
1116
1117 let seq = self
1118 .sequence
1119 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1120 self.bytes_since_sync
1121 .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
1122
1123 Ok(seq)
1124 }
1125
1126 /// Get the current size of the WAL file in bytes
1127 pub fn size_bytes(&self) -> u64 {
1128 std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
1129 }
1130
1131 /// Allocate a new transaction ID
1132 pub fn alloc_txn_id(&self) -> u64 {
1133 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1134 }
1135
1136 /// Begin a new transaction
1137 pub fn begin_transaction(&self) -> Result<u64> {
1138 let txn_id = self.alloc_txn_id();
1139 let entry = TxnWalEntry::txn_begin(txn_id);
1140 self.append(&entry)?;
1141 Ok(txn_id)
1142 }
1143
1144 /// Commit a transaction (with fsync for durability)
1145 ///
1146 /// This flushes all pending writes and then fsyncs the commit record.
1147 ///
1148 /// # Durability Guarantee
1149 ///
1150 /// After this method returns `Ok(())`, the commit record is durable on disk.
1151 /// The transaction is guaranteed to survive process crash, OS crash, and
1152 /// power failure (assuming the storage device honors fsync correctly).
1153 ///
1154 /// # Performance
1155 ///
1156 /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
1157 /// For high-throughput workloads, use `EventDrivenGroupCommit` which
1158 /// batches multiple commits into a single fsync via `commit_durable_batch()`.
1159 pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
1160 // First flush any pending buffered writes
1161 self.flush()?;
1162
1163 // Then write commit record with fsync
1164 let entry = TxnWalEntry::txn_commit(txn_id);
1165 self.append_sync(&entry)?;
1166 Ok(())
1167 }
1168
1169 /// Commit a batch of transactions with a single fsync (group commit).
1170 ///
1171 /// Writes commit records for all transaction IDs, then performs a single
1172 /// flush + fsync. This amortizes the fsync cost across N transactions,
1173 /// achieving ~N× throughput improvement over individual commits.
1174 ///
1175 /// # Durability Guarantee
1176 ///
1177 /// After this method returns `Ok(())`, ALL transactions in the batch
1178 /// are durable on disk. Either all commit records are visible after
1179 /// crash recovery, or none are (atomic batch durability).
1180 ///
1181 /// # Usage
1182 ///
1183 /// This method is called by `EventDrivenGroupCommit::flush_fn` to
1184 /// implement the group commit pattern. Do not call directly unless
1185 /// you are implementing your own commit batching.
1186 pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
1187 // Write all commit records without flushing (batch them in BufWriter)
1188 for &txn_id in txn_ids {
1189 let entry = TxnWalEntry::txn_commit(txn_id);
1190 self.append_no_flush(&entry)?;
1191 }
1192
1193 // Single flush + fsync for the entire batch
1194 self.flush()?;
1195 self.sync()?;
1196 Ok(())
1197 }
1198
1199 /// Abort a transaction
1200 pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
1201 let entry = TxnWalEntry::txn_abort(txn_id);
1202 self.append(&entry)?;
1203 Ok(())
1204 }
1205
1206 /// Write data within a transaction
1207 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1208 let entry = TxnWalEntry::data(txn_id, key, value);
1209 self.append(&entry)
1210 }
1211
1212 /// Replay WAL for crash recovery
1213 ///
1214 /// Returns (committed_writes, recovered_txn_count)
1215 #[allow(clippy::type_complexity)]
1216 pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1217 let file = File::open(&self.path)?;
1218 let mut reader = BufReader::new(file);
1219
1220 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1221 std::collections::HashMap::new();
1222 let mut result = Vec::new();
1223 let mut txn_count = 0;
1224 let mut ordinal: u64 = 0;
1225
1226 // Single pass in WAL order. Transaction ids are process-local and
1227 // short-lived CLI processes can reuse the same ids after restart, so
1228 // grouping the entire WAL by txn_id would merge unrelated transactions
1229 // and replay older writes after newer ones.
1230 loop {
1231 match self.read_record(&mut reader, &mut ordinal) {
1232 Ok(entry) => match entry.record_type {
1233 WalRecordType::TxnBegin => {
1234 pending_writes.insert(entry.txn_id, Vec::new());
1235 }
1236 WalRecordType::Data => {
1237 // Accept data for any txn_id we've seen a Begin for,
1238 // and also for txn_ids without a Begin (they might have
1239 // their Begin later in the WAL due to buffered writes).
1240 pending_writes
1241 .entry(entry.txn_id)
1242 .or_insert_with(Vec::new)
1243 .push((entry.key, entry.value));
1244 }
1245 WalRecordType::TxnCommit => {
1246 if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1247 result.extend(writes);
1248 txn_count += 1;
1249 }
1250 }
1251 WalRecordType::TxnAbort => {
1252 pending_writes.remove(&entry.txn_id);
1253 }
1254 _ => {}
1255 },
1256 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1257 break;
1258 }
1259 Err(e) => {
1260 // Encrypted WALs must fail loud on a non-EOF error so a wrong
1261 // key / tamper / corruption can never masquerade as EOF and
1262 // silently drop committed data. (Wrong key is already excluded
1263 // earlier by the keyring canary, so this is genuine corruption
1264 // or tampering.) Plaintext keeps the legacy torn-tail tolerance.
1265 if self.encryption.is_enabled() {
1266 return Err(e);
1267 }
1268 break;
1269 }
1270 }
1271 }
1272
1273 Ok((result, txn_count))
1274 }
1275
1276 /// Replay the WAL up to a PITR target, returning the committed writes whose
1277 /// transactions are included by the target (and the count of such txns).
1278 ///
1279 /// Prefix semantics — recovery "rolls forward" through the WAL and STOPS at
1280 /// the target, so the result is always a state the database actually passed
1281 /// through:
1282 /// - [`RecoveryTarget::Lsn`]`(l)`: include the first `l` WAL records (exact;
1283 /// `l` matches [`Self::sequence`] / `DurableStorage::current_lsn` captured
1284 /// at that point). A transaction whose commit lands after record `l` is
1285 /// excluded (atomic — partial transactions are never applied).
1286 /// - [`RecoveryTarget::Timestamp`]`(t)`: stop at the FIRST transaction whose
1287 /// commit timestamp exceeds `t`. Best-effort on the coarse, possibly
1288 /// non-monotonic commit clock; prefer `Lsn` for an exact cut.
1289 ///
1290 /// Crypto-aware and fail-loud, exactly like [`Self::replay_for_recovery`].
1291 #[allow(clippy::type_complexity)]
1292 pub fn replay_to_target(
1293 &self,
1294 target: RecoveryTarget,
1295 ) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1296 let file = File::open(&self.path)?;
1297 let mut reader = BufReader::new(file);
1298
1299 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1300 std::collections::HashMap::new();
1301 let mut result = Vec::new();
1302 let mut txn_count = 0;
1303 let mut ordinal: u64 = 0;
1304
1305 loop {
1306 match self.read_record(&mut reader, &mut ordinal) {
1307 Ok(entry) => {
1308 // LSN target: `ordinal` is now this record's 1-based LSN.
1309 // Stop once we pass the target (this record is excluded).
1310 if let RecoveryTarget::Lsn(l) = target {
1311 if ordinal > l {
1312 break;
1313 }
1314 }
1315 match entry.record_type {
1316 WalRecordType::TxnBegin => {
1317 pending_writes.insert(entry.txn_id, Vec::new());
1318 }
1319 WalRecordType::Data => {
1320 pending_writes
1321 .entry(entry.txn_id)
1322 .or_insert_with(Vec::new)
1323 .push((entry.key, entry.value));
1324 }
1325 WalRecordType::TxnCommit => {
1326 // Timestamp target: stop rolling forward at the first
1327 // transaction committed AFTER the target (exclude it).
1328 if let RecoveryTarget::Timestamp(t) = target {
1329 if entry.timestamp_us > t {
1330 break;
1331 }
1332 }
1333 if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1334 result.extend(writes);
1335 txn_count += 1;
1336 }
1337 }
1338 WalRecordType::TxnAbort => {
1339 pending_writes.remove(&entry.txn_id);
1340 }
1341 _ => {}
1342 }
1343 }
1344 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1345 break;
1346 }
1347 Err(e) => {
1348 if self.encryption.is_enabled() {
1349 return Err(e);
1350 }
1351 break;
1352 }
1353 }
1354 }
1355
1356 Ok((result, txn_count))
1357 }
1358
1359 /// Replay WAL with a callback
1360 pub fn replay<F>(&self, mut callback: F) -> Result<u64>
1361 where
1362 F: FnMut(TxnWalEntry) -> Result<()>,
1363 {
1364 let file = File::open(&self.path)?;
1365 let mut reader = BufReader::new(file);
1366 let mut count = 0u64;
1367 let mut ordinal: u64 = 0;
1368
1369 loop {
1370 match self.read_record(&mut reader, &mut ordinal) {
1371 Ok(entry) => {
1372 callback(entry)?;
1373 count += 1;
1374 }
1375 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1376 break;
1377 }
1378 Err(e) => {
1379 // Encrypted: fail loud (never silently drop committed data on a
1380 // non-EOF error). Plaintext: tolerate a trailing incomplete entry.
1381 if self.encryption.is_enabled() {
1382 return Err(e);
1383 }
1384 eprintln!("WAL replay warning: {:?}", e);
1385 break;
1386 }
1387 }
1388 }
1389
1390 Ok(count)
1391 }
1392
1393 /// Truncate WAL (called after successful checkpoint)
1394 ///
1395 /// Flushes any buffered writes, truncates the file to 0 bytes,
1396 /// and resets sequence counters. The file is opened in `O_APPEND`
1397 /// mode so subsequent writes will correctly start at offset 0.
1398 ///
1399 /// **WARNING**: After truncation, all data durability is lost.
1400 /// The in-memory memtable still holds data for the current session,
1401 /// but a crash after truncation means the data cannot be recovered
1402 /// from the WAL.
1403 pub fn truncate(&self) -> Result<()> {
1404 let mut writer = self.writer.lock();
1405 // Flush BufWriter so no stale data is written after truncation
1406 writer.flush()?;
1407 let file = writer.get_ref();
1408 file.set_len(0)?;
1409 file.sync_all()?;
1410 self.sequence.store(0, Ordering::SeqCst);
1411 self.bytes_since_sync.store(0, Ordering::Relaxed);
1412 // The file restarts at offset 0, so per-record AAD ordinals restart too;
1413 // a fresh reader of the truncated file will count from 0 to match.
1414 self.records_in_file.store(0, Ordering::SeqCst);
1415 Ok(())
1416 }
1417
1418 /// Write a checkpoint marker
1419 pub fn write_checkpoint(&self) -> Result<u64> {
1420 let entry = TxnWalEntry::checkpoint(0);
1421 self.append_sync(&entry)
1422 }
1423
1424 /// Write a Compensation Log Record (CLR) for undo operations
1425 ///
1426 /// CLRs are redo-only records that skip past already undone operations
1427 /// during recovery. The undo_next_lsn field tells recovery where to
1428 /// continue undoing after this CLR.
1429 pub fn append_clr(
1430 &self,
1431 txn_id: u64,
1432 _original_lsn: u64,
1433 undo_next_lsn: Option<u64>,
1434 undo_data: &[u8],
1435 ) -> Result<u64> {
1436 // Encode undo_next_lsn into the key field
1437 let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1438 let entry = TxnWalEntry {
1439 record_type: WalRecordType::CompensationLogRecord,
1440 txn_id,
1441 timestamp_us: TxnWalEntry::now_us(),
1442 key, // undo_next_lsn encoded in key
1443 value: undo_data.to_vec(),
1444 };
1445 self.append(&entry)
1446 }
1447
1448 /// Write checkpoint with data (for fuzzy checkpoints)
1449 pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1450 let entry = TxnWalEntry {
1451 record_type: WalRecordType::Checkpoint,
1452 txn_id: 0,
1453 timestamp_us: TxnWalEntry::now_us(),
1454 key: Vec::new(),
1455 value: checkpoint_data.to_vec(),
1456 };
1457 self.append_sync(&entry)
1458 }
1459
1460 /// Write checkpoint end with captured state
1461 pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1462 let entry = TxnWalEntry {
1463 record_type: WalRecordType::CheckpointEnd,
1464 txn_id: 0,
1465 timestamp_us: TxnWalEntry::now_us(),
1466 key: Vec::new(),
1467 value: checkpoint_data.to_vec(),
1468 };
1469 self.append_sync(&entry)
1470 }
1471
1472 /// Get current sequence number
1473 pub fn sequence(&self) -> u64 {
1474 self.sequence.load(Ordering::SeqCst)
1475 }
1476
1477 /// Get bytes written since last sync
1478 pub fn bytes_since_sync(&self) -> u64 {
1479 self.bytes_since_sync.load(Ordering::Relaxed)
1480 }
1481
1482 /// Get path to WAL file
1483 pub fn path(&self) -> &Path {
1484 &self.path
1485 }
1486}
1487
1488/// Statistics about WAL state
1489#[derive(Debug, Clone, Default)]
1490pub struct TxnWalStats {
1491 /// Number of entries written
1492 pub entries_written: u64,
1493 /// Bytes written since last sync
1494 pub bytes_since_sync: u64,
1495 /// Current transaction ID counter
1496 pub next_txn_id: u64,
1497}
1498
1499// ============================================================================
1500// Sharded WAL for Reduced Mutex Contention
1501// ============================================================================
1502
1503/// Sharded Write-Ahead Log for high-concurrency workloads
1504///
1505/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1506/// - Writers hash to shard by txn_id
1507/// - Each shard has its own buffer
1508/// - Central coordinator handles fsync ordering
1509///
1510/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1511#[allow(dead_code)]
1512pub struct ShardedWal {
1513 /// Shard writers (txn_id % num_shards selects shard)
1514 shards: Vec<parking_lot::Mutex<WalShard>>,
1515 /// Number of shards (power of 2)
1516 num_shards: usize,
1517 /// Central WAL file for ordered writes
1518 central_writer: parking_lot::Mutex<BufWriter<File>>,
1519 /// Next transaction ID
1520 next_txn_id: AtomicU64,
1521 /// Write sequence (global ordering)
1522 sequence: AtomicU64,
1523 /// Path
1524 path: PathBuf,
1525}
1526
1527/// Individual WAL shard buffer
1528struct WalShard {
1529 /// Buffered entries for this shard
1530 buffer: Vec<u8>,
1531 /// Number of entries buffered
1532 entry_count: usize,
1533}
1534
1535impl WalShard {
1536 fn new() -> Self {
1537 Self {
1538 buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1539 entry_count: 0,
1540 }
1541 }
1542
1543 fn append(&mut self, entry: &TxnWalEntry) {
1544 let bytes = entry.to_bytes();
1545 self.buffer.extend_from_slice(&bytes);
1546 self.entry_count += 1;
1547 }
1548
1549 fn is_empty(&self) -> bool {
1550 self.buffer.is_empty()
1551 }
1552
1553 fn drain(&mut self) -> Vec<u8> {
1554 self.entry_count = 0;
1555 std::mem::take(&mut self.buffer)
1556 }
1557}
1558
1559impl ShardedWal {
1560 /// Create sharded WAL with specified number of shards
1561 ///
1562 /// Recommended: 4-8 shards for typical server workloads
1563 pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1564 let path = path.as_ref().to_path_buf();
1565
1566 if let Some(parent) = path.parent() {
1567 std::fs::create_dir_all(parent)?;
1568 }
1569
1570 let file = std::fs::OpenOptions::new()
1571 .create(true)
1572 .append(true)
1573 .read(true)
1574 .open(&path)?;
1575
1576 // Round up to power of 2 for fast modulo
1577 let num_shards = num_shards.next_power_of_two();
1578 let shards: Vec<_> = (0..num_shards)
1579 .map(|_| parking_lot::Mutex::new(WalShard::new()))
1580 .collect();
1581
1582 Ok(Self {
1583 shards,
1584 num_shards,
1585 central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1586 next_txn_id: AtomicU64::new(1),
1587 sequence: AtomicU64::new(0),
1588 path,
1589 })
1590 }
1591
1592 /// Get shard index for transaction
1593 #[inline]
1594 fn shard_idx(&self, txn_id: u64) -> usize {
1595 (txn_id as usize) & (self.num_shards - 1)
1596 }
1597
1598 /// Append entry to appropriate shard (lock-free for different txns)
1599 pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1600 let shard_idx = self.shard_idx(entry.txn_id);
1601 let mut shard = self.shards[shard_idx].lock();
1602 shard.append(entry);
1603 self.sequence.fetch_add(1, Ordering::SeqCst)
1604 }
1605
1606 /// Allocate transaction ID
1607 pub fn alloc_txn_id(&self) -> u64 {
1608 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1609 }
1610
1611 /// Flush all shard buffers to central file
1612 pub fn flush(&self) -> Result<()> {
1613 let mut central = self.central_writer.lock();
1614
1615 // Collect all shard buffers (brief lock per shard)
1616 for shard in &self.shards {
1617 let mut shard_guard = shard.lock();
1618 if !shard_guard.is_empty() {
1619 let data = shard_guard.drain();
1620 central.write_all(&data)?;
1621 }
1622 }
1623
1624 central.flush()?;
1625 Ok(())
1626 }
1627
1628 /// Sync to disk (fsync)
1629 pub fn sync(&self) -> Result<()> {
1630 self.flush()?;
1631 let central = self.central_writer.lock();
1632 central.get_ref().sync_all()?;
1633 Ok(())
1634 }
1635
1636 /// Begin transaction
1637 pub fn begin_transaction(&self) -> Result<u64> {
1638 let txn_id = self.alloc_txn_id();
1639 let entry = TxnWalEntry::txn_begin(txn_id);
1640 self.append(&entry);
1641 Ok(txn_id)
1642 }
1643
1644 /// Write data
1645 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1646 let entry = TxnWalEntry::data(txn_id, key, value);
1647 Ok(self.append(&entry))
1648 }
1649
1650 /// Commit transaction
1651 pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1652 let entry = TxnWalEntry::txn_commit(txn_id);
1653 let seq = self.append(&entry);
1654 self.sync()?; // Fsync on commit for durability
1655 Ok(seq)
1656 }
1657
1658 /// Get statistics
1659 pub fn stats(&self) -> ShardedWalStats {
1660 let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1661 for shard in &self.shards {
1662 shard_entry_counts.push(shard.lock().entry_count);
1663 }
1664
1665 ShardedWalStats {
1666 num_shards: self.num_shards,
1667 total_entries: self.sequence.load(Ordering::SeqCst),
1668 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1669 shard_entry_counts,
1670 }
1671 }
1672}
1673
1674/// Statistics for sharded WAL
1675#[derive(Debug, Clone)]
1676pub struct ShardedWalStats {
1677 pub num_shards: usize,
1678 pub total_entries: u64,
1679 pub next_txn_id: u64,
1680 pub shard_entry_counts: Vec<usize>,
1681}
1682
1683/// Detailed crash recovery statistics
1684#[derive(Debug, Clone, Default)]
1685pub struct CrashRecoveryStats {
1686 /// Total records read from WAL
1687 pub total_records: u64,
1688 /// Number of committed transactions
1689 pub committed_txns: u64,
1690 /// Number of uncommitted (rolled back) transactions
1691 pub rolled_back_txns: u64,
1692 /// Number of explicitly aborted transactions
1693 pub aborted_txns: u64,
1694 /// Number of data writes recovered
1695 pub recovered_writes: u64,
1696 /// Number of torn/corrupted records at end (expected on crash)
1697 pub torn_records: u64,
1698 /// Bytes read from WAL
1699 pub bytes_read: u64,
1700 /// Recovery duration in microseconds
1701 pub recovery_duration_us: u64,
1702 /// Highest transaction ID seen (for restarting counter)
1703 pub max_txn_id: u64,
1704}
1705
1706impl TxnWal {
1707 /// Get WAL statistics
1708 pub fn stats(&self) -> TxnWalStats {
1709 TxnWalStats {
1710 entries_written: self.sequence.load(Ordering::SeqCst),
1711 bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1712 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1713 }
1714 }
1715
1716 /// Full crash recovery with detailed statistics
1717 ///
1718 /// This method provides ACID recovery guarantees:
1719 /// 1. **Atomicity**: Uncommitted transactions are rolled back
1720 /// 2. **Durability**: All committed transactions are replayed
1721 /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1722 ///
1723 /// Returns (committed_writes, stats) where committed_writes contains
1724 /// all key-value pairs from committed transactions in order.
1725 #[allow(clippy::type_complexity)]
1726 pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1727 let start_time = std::time::Instant::now();
1728 let file = File::open(&self.path)?;
1729 let file_size = file.metadata()?.len();
1730 let mut reader = BufReader::new(file);
1731
1732 let mut stats = CrashRecoveryStats {
1733 bytes_read: file_size,
1734 ..Default::default()
1735 };
1736
1737 let mut committed_txns: HashSet<u64> = HashSet::new();
1738 let mut aborted_txns: HashSet<u64> = HashSet::new();
1739 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1740 std::collections::HashMap::new();
1741 let mut all_txns: HashSet<u64> = HashSet::new();
1742
1743 // Read all records, stopping at first corruption (torn write)
1744 let mut ordinal: u64 = 0;
1745 loop {
1746 match self.read_record(&mut reader, &mut ordinal) {
1747 Ok(entry) => {
1748 stats.total_records += 1;
1749 if entry.txn_id > stats.max_txn_id {
1750 stats.max_txn_id = entry.txn_id;
1751 }
1752
1753 match entry.record_type {
1754 WalRecordType::TxnBegin => {
1755 pending_writes.insert(entry.txn_id, Vec::new());
1756 all_txns.insert(entry.txn_id);
1757 }
1758 WalRecordType::Data => {
1759 if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1760 writes.push((entry.key, entry.value));
1761 }
1762 }
1763 WalRecordType::TxnCommit => {
1764 committed_txns.insert(entry.txn_id);
1765 }
1766 WalRecordType::TxnAbort => {
1767 pending_writes.remove(&entry.txn_id);
1768 aborted_txns.insert(entry.txn_id);
1769 }
1770 _ => {}
1771 }
1772 }
1773 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1774 // Clean EOF
1775 break;
1776 }
1777 Err(e) => {
1778 // Encrypted: an AEAD auth failure / tamper / corruption is a
1779 // hard error — abort recovery rather than silently truncating
1780 // committed data (wrong key is already caught by the keyring
1781 // canary at open). Plaintext: treat trailing corruption as a
1782 // torn write and stop.
1783 if self.encryption.is_enabled() {
1784 return Err(e);
1785 }
1786 stats.torn_records += 1;
1787 break;
1788 }
1789 }
1790 }
1791
1792 // Collect committed writes
1793 let mut result = Vec::new();
1794 for (txn_id, writes) in &pending_writes {
1795 if committed_txns.contains(txn_id) {
1796 stats.committed_txns += 1;
1797 stats.recovered_writes += writes.len() as u64;
1798 result.extend(writes.clone());
1799 }
1800 }
1801
1802 // Count aborted and rolled-back transactions
1803 stats.aborted_txns = aborted_txns.len() as u64;
1804 stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1805
1806 stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1807
1808 Ok((result, stats))
1809 }
1810}
1811
1812#[cfg(test)]
1813mod tests {
1814 use super::*;
1815 use tempfile::tempdir;
1816
1817 #[test]
1818 fn test_wal_entry_roundtrip() {
1819 let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1820 let bytes = entry.to_bytes();
1821
1822 let mut cursor = std::io::Cursor::new(bytes);
1823 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1824
1825 assert_eq!(recovered.record_type, WalRecordType::Data);
1826 assert_eq!(recovered.txn_id, 42);
1827 assert_eq!(recovered.key, b"key");
1828 assert_eq!(recovered.value, b"value");
1829 }
1830
1831 #[test]
1832 fn test_wal_append_and_replay() {
1833 let dir = tempdir().unwrap();
1834 let wal_path = dir.path().join("test.wal");
1835
1836 // Write some entries
1837 {
1838 let wal = TxnWal::new(&wal_path).unwrap();
1839 let txn_id = wal.begin_transaction().unwrap();
1840 wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1841 wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1842 wal.commit_transaction(txn_id).unwrap();
1843 }
1844
1845 // Replay and verify
1846 {
1847 let wal = TxnWal::new(&wal_path).unwrap();
1848 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1849
1850 assert_eq!(txn_count, 1);
1851 assert_eq!(writes.len(), 2);
1852 assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1853 assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1854 }
1855 }
1856
1857 #[test]
1858 fn test_uncommitted_transaction_rollback() {
1859 let dir = tempdir().unwrap();
1860 let wal_path = dir.path().join("test.wal");
1861
1862 // Write committed and uncommitted transactions
1863 {
1864 let wal = TxnWal::new(&wal_path).unwrap();
1865
1866 // Committed transaction
1867 let txn1 = wal.begin_transaction().unwrap();
1868 wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1869 .unwrap();
1870 wal.commit_transaction(txn1).unwrap();
1871
1872 // Uncommitted transaction (simulates crash)
1873 let txn2 = wal.begin_transaction().unwrap();
1874 wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1875 .unwrap();
1876 // No commit!
1877 }
1878
1879 // Replay - uncommitted should be rolled back
1880 {
1881 let wal = TxnWal::new(&wal_path).unwrap();
1882 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1883
1884 assert_eq!(txn_count, 1); // Only committed transaction
1885 assert_eq!(writes.len(), 1);
1886 assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1887 }
1888 }
1889
1890 #[test]
1891 fn test_aborted_transaction() {
1892 let dir = tempdir().unwrap();
1893 let wal_path = dir.path().join("test.wal");
1894
1895 {
1896 let wal = TxnWal::new(&wal_path).unwrap();
1897
1898 let txn = wal.begin_transaction().unwrap();
1899 wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1900 .unwrap();
1901 wal.abort_transaction(txn).unwrap();
1902 }
1903
1904 {
1905 let wal = TxnWal::new(&wal_path).unwrap();
1906 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1907
1908 assert_eq!(txn_count, 0);
1909 assert!(writes.is_empty());
1910 }
1911 }
1912
1913 #[test]
1914 fn test_checksum_validation() {
1915 let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1916 let mut bytes = entry.to_bytes();
1917
1918 // Corrupt the checksum
1919 let len = bytes.len();
1920 bytes[len - 1] ^= 0xFF;
1921
1922 let mut cursor = std::io::Cursor::new(bytes);
1923 let result = TxnWalEntry::from_reader(&mut cursor);
1924
1925 assert!(result.is_err());
1926 }
1927
1928 #[test]
1929 fn test_crash_recovery_with_stats() {
1930 let dir = tempdir().unwrap();
1931 let wal_path = dir.path().join("test.wal");
1932
1933 // Simulate complex workload
1934 {
1935 let wal = TxnWal::new(&wal_path).unwrap();
1936
1937 // Committed transaction 1
1938 let txn1 = wal.begin_transaction().unwrap();
1939 wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1940 wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1941 wal.commit_transaction(txn1).unwrap();
1942
1943 // Aborted transaction
1944 let txn2 = wal.begin_transaction().unwrap();
1945 wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1946 .unwrap();
1947 wal.abort_transaction(txn2).unwrap();
1948
1949 // Committed transaction 2
1950 let txn3 = wal.begin_transaction().unwrap();
1951 wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1952 wal.commit_transaction(txn3).unwrap();
1953
1954 // Uncommitted transaction (simulates crash)
1955 let txn4 = wal.begin_transaction().unwrap();
1956 wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1957 .unwrap();
1958 // No commit - simulates crash
1959 }
1960
1961 // Recover and verify
1962 {
1963 let wal = TxnWal::new(&wal_path).unwrap();
1964 let (writes, stats) = wal.crash_recovery().unwrap();
1965
1966 // Should have 3 writes from 2 committed transactions
1967 assert_eq!(writes.len(), 3);
1968 assert_eq!(stats.committed_txns, 2);
1969 assert_eq!(stats.aborted_txns, 1);
1970 assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1971 assert_eq!(stats.recovered_writes, 3);
1972 assert!(stats.recovery_duration_us > 0);
1973 }
1974 }
1975
1976 #[test]
1977 fn test_torn_write_detection() {
1978 let dir = tempdir().unwrap();
1979 let wal_path = dir.path().join("test.wal");
1980
1981 // Write a valid transaction
1982 {
1983 let wal = TxnWal::new(&wal_path).unwrap();
1984 let txn = wal.begin_transaction().unwrap();
1985 wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1986 wal.commit_transaction(txn).unwrap();
1987 }
1988
1989 // Append corrupted bytes to simulate torn write
1990 {
1991 use std::io::Write;
1992 let mut file = std::fs::OpenOptions::new()
1993 .append(true)
1994 .open(&wal_path)
1995 .unwrap();
1996 // Write partial record (torn write)
1997 file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1998 .unwrap();
1999 }
2000
2001 // Recovery should still work, detecting torn write
2002 {
2003 let wal = TxnWal::new(&wal_path).unwrap();
2004 let (writes, stats) = wal.crash_recovery().unwrap();
2005
2006 // Should recover the valid transaction
2007 assert_eq!(writes.len(), 1);
2008 assert_eq!(stats.committed_txns, 1);
2009 assert_eq!(stats.torn_records, 1);
2010 }
2011 }
2012
2013 #[test]
2014 fn test_crc32_determinism() {
2015 // Verify CRC32 produces consistent checksums for same content
2016 let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
2017 entry1.timestamp_us = 12345; // Fixed timestamp for determinism
2018
2019 let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
2020 entry2.timestamp_us = 12345; // Same timestamp
2021
2022 assert_eq!(entry1.checksum(), entry2.checksum());
2023
2024 // Different content should produce different checksum
2025 let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
2026 entry3.timestamp_us = 12345;
2027 assert_ne!(entry1.checksum(), entry3.checksum());
2028
2029 // Verify roundtrip preserves checksum
2030 let bytes = entry1.to_bytes();
2031 let mut cursor = std::io::Cursor::new(bytes);
2032 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
2033 assert_eq!(recovered.checksum(), entry1.checksum());
2034 }
2035}
2036
2037#[cfg(test)]
2038mod encryption_wal_tests {
2039 use super::*;
2040 use crate::encryption::EncryptionEngine;
2041 use tempfile::tempdir;
2042
2043 fn enc(key: u8) -> Arc<EncryptionEngine> {
2044 Arc::new(EncryptionEngine::new(&[key; 32]))
2045 }
2046 const UUID: [u8; 16] = [9u8; 16];
2047
2048 /// Write a committed txn (begin/data/commit) and one aborted txn via the
2049 /// encrypted paths, then reopen with the same key and crash-recover.
2050 #[test]
2051 fn encrypted_write_then_recover_roundtrip() {
2052 let dir = tempdir().unwrap();
2053 let path = dir.path().join("enc.wal");
2054
2055 {
2056 let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
2057 // committed txn via append + write_no_flush_refs + flush_buffer paths
2058 let t1 = wal.begin_transaction().unwrap();
2059 wal.write(t1, b"alpha".to_vec(), b"one".to_vec()).unwrap();
2060 wal.write_no_flush_refs(t1, b"beta", b"two").unwrap();
2061 // also exercise the buffer/flush_buffer batch path
2062 let mut buf = TxnWalBuffer::new(t1);
2063 buf.append(b"gamma", b"three");
2064 wal.flush_buffer(&buf).unwrap();
2065 wal.commit_transaction(t1).unwrap();
2066
2067 // aborted txn must NOT survive
2068 let t2 = wal.begin_transaction().unwrap();
2069 wal.write(t2, b"ghost".to_vec(), b"x".to_vec()).unwrap();
2070 wal.abort_transaction(t2).unwrap();
2071 wal.sync().unwrap();
2072 }
2073
2074 // On-disk bytes must NOT contain the plaintext values.
2075 let raw = std::fs::read(&path).unwrap();
2076 assert!(!contains(&raw, b"alpha"));
2077 assert!(!contains(&raw, b"three"));
2078
2079 let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
2080 let (writes, stats) = wal.crash_recovery().unwrap();
2081 let keys: Vec<_> = writes.iter().map(|(k, _)| k.clone()).collect();
2082 assert!(keys.contains(&b"alpha".to_vec()));
2083 assert!(keys.contains(&b"beta".to_vec()));
2084 assert!(keys.contains(&b"gamma".to_vec()));
2085 assert!(!keys.contains(&b"ghost".to_vec()), "aborted txn leaked");
2086 assert_eq!(stats.committed_txns, 1);
2087 }
2088
2089 /// Wrong key at the WAL layer must FAIL LOUD, not silently return empty.
2090 /// (In production the keyring canary catches this even earlier; this proves
2091 /// the replay path itself never swallows an AEAD failure as EOF.)
2092 #[test]
2093 fn wrong_key_fails_loud_not_empty() {
2094 let dir = tempdir().unwrap();
2095 let path = dir.path().join("enc.wal");
2096 {
2097 let wal = TxnWal::new_with_encryption(&path, enc(1), UUID, 0).unwrap();
2098 let t = wal.begin_transaction().unwrap();
2099 wal.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
2100 wal.commit_transaction(t).unwrap();
2101 wal.sync().unwrap();
2102 }
2103 // Reopen with the WRONG key: recover_state runs in the constructor and
2104 // must surface a hard error rather than an "empty" success.
2105 let opened = TxnWal::new_with_encryption(&path, enc(2), UUID, 0);
2106 assert!(opened.is_err(), "wrong key opened silently (data-loss bug)");
2107 match opened.err().unwrap() {
2108 SochDBError::Encryption(_) => {}
2109 other => panic!("expected Encryption error, got {other:?}"),
2110 }
2111 }
2112
2113 /// Tampering with a committed encrypted record must fail recovery loud.
2114 #[test]
2115 fn tamper_midstream_fails_loud() {
2116 let dir = tempdir().unwrap();
2117 let path = dir.path().join("enc.wal");
2118 {
2119 let wal = TxnWal::new_with_encryption(&path, enc(5), UUID, 0).unwrap();
2120 let t = wal.begin_transaction().unwrap();
2121 wal.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
2122 wal.commit_transaction(t).unwrap();
2123 wal.sync().unwrap();
2124 }
2125 // Flip a byte well inside the file (not the final torn-tail region).
2126 let mut raw = std::fs::read(&path).unwrap();
2127 let mid = raw.len() / 2;
2128 raw[mid] ^= 0xFF;
2129 std::fs::write(&path, &raw).unwrap();
2130
2131 let opened = TxnWal::new_with_encryption(&path, enc(5), UUID, 0);
2132 // Either the constructor's recover_state errors, or a later crash_recovery
2133 // does — but it must NEVER silently accept tampered ciphertext.
2134 let failed = opened.is_err()
2135 || opened
2136 .ok()
2137 .map(|w| w.crash_recovery().is_err())
2138 .unwrap_or(true);
2139 assert!(failed, "tampered encrypted WAL was silently accepted");
2140 }
2141
2142 /// The disabled engine must write a record's bytes EXACTLY as the legacy
2143 /// `to_bytes()` frame (no added encryption framing / format drift), while the
2144 /// enabled engine must NOT (it adds the AEAD envelope and is unreadable by the
2145 /// plaintext reader). Records embed wall-clock timestamps, so we compare a
2146 /// single fixed entry object against its own `to_bytes()` for determinism.
2147 #[test]
2148 fn disabled_engine_is_byte_identical_to_plaintext() {
2149 let dir = tempdir().unwrap();
2150
2151 let entry = TxnWalEntry::data(42, b"k1".to_vec(), b"v1".to_vec());
2152 let golden = entry.to_bytes();
2153
2154 // Disabled engine: file bytes == to_bytes() exactly.
2155 let p_dis = dir.path().join("disabled.wal");
2156 {
2157 let wal = TxnWal::new_with_encryption(
2158 &p_dis,
2159 Arc::new(EncryptionEngine::disabled()),
2160 [0u8; 16],
2161 0,
2162 )
2163 .unwrap();
2164 wal.append(&entry).unwrap();
2165 wal.sync().unwrap();
2166 }
2167 assert_eq!(
2168 std::fs::read(&p_dis).unwrap(),
2169 golden,
2170 "disabled-engine append diverged from legacy plaintext frame"
2171 );
2172
2173 // Enabled engine: file bytes differ and are NOT plaintext-parseable.
2174 let p_enc = dir.path().join("enc.wal");
2175 {
2176 let wal = TxnWal::new_with_encryption(&p_enc, enc(3), UUID, 0).unwrap();
2177 wal.append(&entry).unwrap();
2178 wal.sync().unwrap();
2179 }
2180 let enc_bytes = std::fs::read(&p_enc).unwrap();
2181 assert_ne!(enc_bytes, golden);
2182 let mut cur = std::io::Cursor::new(&enc_bytes);
2183 assert!(
2184 TxnWalEntry::from_reader(&mut cur).is_err()
2185 || cur.position() as usize != enc_bytes.len(),
2186 "ciphertext frame must not parse cleanly as a plaintext record"
2187 );
2188 }
2189
2190 fn contains(haystack: &[u8], needle: &[u8]) -> bool {
2191 haystack.windows(needle.len()).any(|w| w == needle)
2192 }
2193}