sochdb_storage/txn_wal.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction-Aware WAL for ACID Transactions
19//!
20//! This WAL implementation provides ACID transaction support:
21//! - Atomicity: All writes in a transaction are logged together
22//! - Consistency: Schema validation before commit
23//! - Isolation: Transaction IDs for MVCC
24//! - Durability: fsync after commit
25//!
26//! ## WAL Record Types
27//!
28//! - Data: Key-value write
29//! - TxnBegin: Start of transaction
30//! - TxnCommit: Commit point (durability guarantee)
31//! - TxnAbort: Transaction rollback
32//! - Checkpoint: Snapshot marker for recovery
33//! - SchemaChange: DDL operations
34//!
35//! ## Record Format (Fixed Layout for Torn Write Detection)
36//!
37//! ```text
38//! ┌───────────┬──────────┬─────────┬───────────┬─────────┬───────────┬─────────┬─────────┬──────────┐
39//! │ Length(4) │ Type(1) │ TxnId(8)│ Timestamp │ KeyLen │ ValueLen │ Key(*) │ Value(*)│ CRC32(4) │
40//! │ │ │ │ (8) │ (4) │ (4) │ │ │ │
41//! └───────────┴──────────┴─────────┴───────────┴─────────┴───────────┴─────────┴─────────┴──────────┘
42//! ```
43//!
44//! ## Crash Recovery Guarantees
45//!
46//! 1. **Torn Write Detection**: Length prefix + CRC32 checksum detects partial writes
47//! 2. **Atomic Commit**: Commit record with fsync ensures durability
48//! 3. **Uncommitted Rollback**: Transactions without commit record are discarded
49//! 4. **Checkpoint Safety**: Checkpoint marker allows safe WAL truncation
50
51use crate::encryption::EncryptionEngine;
52use byteorder::{LittleEndian, ReadBytesExt};
53use parking_lot::Mutex;
54use sochdb_core::{Result, SochDBError, WalRecordType};
55use std::cell::Cell;
56use std::collections::HashSet;
57use std::fs::{File, OpenOptions};
58use std::io::{BufReader, BufWriter, Read, Write};
59use std::path::{Path, PathBuf};
60use std::sync::Arc;
61use std::sync::atomic::{AtomicU64, Ordering};
62use std::time::{Instant, SystemTime, UNIX_EPOCH};
63
64// =============================================================================
65// Coarse-Grained Timestamp Caching (Recommendation 5)
66// =============================================================================
67
68/// Cache validity period in nanoseconds (1ms - allows ~1000+ writes per refresh)
69const CACHE_VALIDITY_NS: u64 = 1_000_000;
70
71thread_local! {
72 /// Thread-local timestamp cache: (Instant, cached_timestamp_us)
73 /// Avoids syscall overhead by caching timestamp for ~1ms windows
74 static TS_CACHE: Cell<(Instant, u64)> = Cell::new((Instant::now(), 0));
75}
76
77/// Get cached timestamp in microseconds
78///
79/// This function eliminates per-write `SystemTime::now()` syscalls by:
80/// 1. Caching the wall-clock timestamp in thread-local storage
81/// 2. Using monotonic `Instant` for sub-millisecond offsets
82/// 3. Only refreshing from syscall when cache expires (every ~1ms)
83///
84/// ## Performance Impact
85///
86/// - Without caching: ~20-25ns per syscall × 1M writes = 20-25ms overhead
87/// - With caching: ~20ns per 1000 writes = 0.02ms overhead (1000× improvement)
88///
89/// ## Monotonicity Guarantee
90///
91/// Timestamps are guaranteed to be monotonically increasing within a thread
92/// due to the `elapsed` offset added to the cached base timestamp.
93#[inline(always)]
94pub fn cached_timestamp_us() -> u64 {
95 TS_CACHE.with(|cache| {
96 let (instant, ts) = cache.get();
97 let elapsed_ns = instant.elapsed().as_nanos() as u64;
98
99 if elapsed_ns < CACHE_VALIDITY_NS {
100 // Fast path: return cached + monotonic offset
101 // This is pure arithmetic, no syscall
102 ts + elapsed_ns / 1000
103 } else {
104 // Slow path: refresh cache from syscall
105 let new_ts = SystemTime::now()
106 .duration_since(UNIX_EPOCH)
107 .expect("system clock set before UNIX epoch (1970-01-01)")
108 .as_micros() as u64;
109 cache.set((Instant::now(), new_ts));
110 new_ts
111 }
112 })
113}
114
115/// Header size in bytes (without key/value data)
116const RECORD_HEADER_SIZE: usize = 4 + 1 + 8 + 8 + 4 + 4; // length + type + txn_id + timestamp + key_len + value_len
117
118/// Checksum size (CRC32)
119const CHECKSUM_SIZE: usize = 4;
120
121/// Default capacity for transaction-local WAL buffer (32 KB - typical batch of 100-500 writes)
122const DEFAULT_TXN_BUFFER_CAPACITY: usize = 32 * 1024;
123
124// =============================================================================
125// At-rest encryption framing (Task 3B)
126// =============================================================================
127//
128// When the WAL's `EncryptionEngine` is enabled, each record is written as an
129// encrypted frame instead of the plaintext frame above:
130//
131// ```text
132// plaintext frame: [content_len:u32][type|txn|ts|klen|vlen|key|val|crc32]
133// encrypted frame: [outer_len:u32 ][ver|nonce(12)|ciphertext+tag(16)]
134// ```
135//
136// `outer_len` is the length of the AEAD envelope (ciphertext.len()); the inner
137// plaintext that the envelope protects is the record BODY — exactly the bytes
138// after `content_len` in the plaintext frame (`type..crc32`). On decrypt the
139// body is parsed by [`TxnWalEntry::parse_body`], reusing the same field layout +
140// CRC check as the plaintext path.
141//
142// The per-record AAD binds {format_version, db_uuid, dek_epoch, file-relative
143// record ordinal} so a record cannot be reordered, duplicated, spliced from
144// another DB, or read under a downgraded format without failing authentication.
145// The reader reconstructs the identical AAD from its own trusted state (the
146// keyring's db_uuid/epoch and a 0-based counter of records read from this file),
147// NEVER from the attacker-controllable on-disk bytes.
148
149/// Encode a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) — the bytes the
150/// AEAD envelope protects, identical to `to_bytes()` minus the length prefix.
151/// Used by the encrypted write paths to materialize a record before sealing it.
152fn encode_record_body(
153 record_type: WalRecordType,
154 txn_id: u64,
155 timestamp_us: u64,
156 key: &[u8],
157 value: &[u8],
158) -> Vec<u8> {
159 let body_len = (RECORD_HEADER_SIZE - 4) + key.len() + value.len() + CHECKSUM_SIZE;
160 let mut body = Vec::with_capacity(body_len);
161 let mut hasher = crc32fast::Hasher::new();
162 let rt = record_type as u8;
163 body.push(rt);
164 hasher.update(&[rt]);
165 let t = txn_id.to_le_bytes();
166 body.extend_from_slice(&t);
167 hasher.update(&t);
168 let ts = timestamp_us.to_le_bytes();
169 body.extend_from_slice(&ts);
170 hasher.update(&ts);
171 let kl = (key.len() as u32).to_le_bytes();
172 body.extend_from_slice(&kl);
173 hasher.update(&kl);
174 let vl = (value.len() as u32).to_le_bytes();
175 body.extend_from_slice(&vl);
176 hasher.update(&vl);
177 body.extend_from_slice(key);
178 hasher.update(key);
179 body.extend_from_slice(value);
180 hasher.update(value);
181 body.extend_from_slice(&hasher.finalize().to_le_bytes());
182 body
183}
184
185/// AAD layout version for the WAL record binding. Part of the on-disk contract.
186const WAL_AAD_VERSION: u8 = 1;
187/// AAD length: version(1) + db_uuid(16) + dek_epoch(4) + record_ordinal(8).
188const WAL_AAD_LEN: usize = 1 + 16 + 4 + 8;
189/// Upper bound on a single WAL frame's on-disk length, to reject a corrupted
190/// length prefix before it triggers a multi-GB `read_exact` allocation/DoS.
191const MAX_WAL_FRAME_LEN: u32 = 512 * 1024 * 1024;
192
193// =============================================================================
194// Transaction-Local WAL Buffer - Zero Lock Overhead During Transaction
195// =============================================================================
196
197/// Transaction-local WAL write buffer
198///
199/// Collects all writes during a transaction in memory, then flushes
200/// everything with a SINGLE lock acquisition at commit time.
201///
202/// ## Performance Impact
203///
204/// ```text
205/// Without TxnWalBuffer (current):
206/// 1000 writes × (lock + 9 write_all + unlock) = 1000 lock acquisitions
207/// Lock overhead: 1000 × 20ns = 20µs per transaction
208///
209/// With TxnWalBuffer:
210/// 1000 writes buffered locally (NO LOCK)
211/// 1 flush at commit (SINGLE LOCK)
212/// Lock overhead: 1 × 20ns = 20ns per transaction
213/// Speedup: 1000× for lock overhead
214/// ```
215///
216/// ## Usage
217///
218/// ```ignore
219/// let mut buffer = TxnWalBuffer::new(txn_id);
220///
221/// // These do NOT acquire any locks
222/// buffer.append(b"key1", b"value1");
223/// buffer.append(b"key2", b"value2");
224/// // ... hundreds of writes ...
225///
226/// // Single lock acquisition, single write syscall
227/// buffer.flush(&wal)?;
228/// ```
229#[derive(Debug)]
230pub struct TxnWalBuffer {
231 /// Transaction ID for all entries
232 txn_id: u64,
233 /// Accumulated serialized entries
234 buffer: Vec<u8>,
235 /// Number of entries buffered
236 entry_count: usize,
237}
238
239impl TxnWalBuffer {
240 /// Create a new buffer for a transaction
241 #[inline]
242 pub fn new(txn_id: u64) -> Self {
243 Self {
244 txn_id,
245 buffer: Vec::with_capacity(DEFAULT_TXN_BUFFER_CAPACITY),
246 entry_count: 0,
247 }
248 }
249
250 /// Create with specific capacity
251 #[inline]
252 pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
253 Self {
254 txn_id,
255 buffer: Vec::with_capacity(capacity),
256 entry_count: 0,
257 }
258 }
259
260 /// Append a key-value write to the buffer - NO LOCK, NO SYSCALL
261 ///
262 /// Serializes the WAL entry directly to the buffer with CRC32 calculation.
263 /// This is completely lock-free and does not touch the file system.
264 ///
265 /// Uses cached timestamps to eliminate per-write syscall overhead.
266 #[inline]
267 pub fn append(&mut self, key: &[u8], value: &[u8]) {
268 // Use cached timestamp instead of syscall (Recommendation 5)
269 let timestamp_us = cached_timestamp_us();
270
271 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
272 let entry_start = self.buffer.len();
273
274 // Reserve space for length prefix (will fill at end)
275 self.buffer.extend_from_slice(&[0u8; 4]);
276
277 let mut hasher = crc32fast::Hasher::new();
278
279 // Record type (Data = 0)
280 let record_type_byte = WalRecordType::Data as u8;
281 self.buffer.push(record_type_byte);
282 hasher.update(&[record_type_byte]);
283
284 // Transaction ID
285 let txn_bytes = self.txn_id.to_le_bytes();
286 self.buffer.extend_from_slice(&txn_bytes);
287 hasher.update(&txn_bytes);
288
289 // Timestamp
290 let ts_bytes = timestamp_us.to_le_bytes();
291 self.buffer.extend_from_slice(&ts_bytes);
292 hasher.update(&ts_bytes);
293
294 // Key length
295 let key_len_bytes = (key.len() as u32).to_le_bytes();
296 self.buffer.extend_from_slice(&key_len_bytes);
297 hasher.update(&key_len_bytes);
298
299 // Value length
300 let val_len_bytes = (value.len() as u32).to_le_bytes();
301 self.buffer.extend_from_slice(&val_len_bytes);
302 hasher.update(&val_len_bytes);
303
304 // Key data
305 self.buffer.extend_from_slice(key);
306 hasher.update(key);
307
308 // Value data
309 self.buffer.extend_from_slice(value);
310 hasher.update(value);
311
312 // CRC32 checksum
313 self.buffer
314 .extend_from_slice(&hasher.finalize().to_le_bytes());
315
316 // Fill in length prefix (content length, not including the 4-byte length field)
317 let content_len = (total_len - 4) as u32;
318 self.buffer[entry_start..entry_start + 4].copy_from_slice(&content_len.to_le_bytes());
319
320 self.entry_count += 1;
321 }
322
323 /// Flush all buffered entries to WAL - SINGLE LOCK, SINGLE WRITE
324 ///
325 /// Acquires the WAL lock once and writes all accumulated entries.
326 /// Returns the first sequence number assigned to buffered entries.
327 ///
328 /// Note: Use TxnWal::flush_buffer() instead of calling this directly,
329 /// as it properly handles the private writer field.
330 #[inline]
331 pub fn flush_to_wal(&self, wal: &TxnWal) -> Result<u64> {
332 wal.flush_buffer(self)
333 }
334
335 /// Clear the buffer (for reuse)
336 #[inline]
337 pub fn clear(&mut self) {
338 self.buffer.clear();
339 self.entry_count = 0;
340 }
341
342 /// Get number of buffered entries
343 #[inline]
344 pub fn entry_count(&self) -> usize {
345 self.entry_count
346 }
347
348 /// Get total bytes buffered
349 #[inline]
350 pub fn bytes_buffered(&self) -> usize {
351 self.buffer.len()
352 }
353
354 /// Check if buffer is empty
355 #[inline]
356 pub fn is_empty(&self) -> bool {
357 self.buffer.is_empty()
358 }
359}
360
361/// WAL entry for transaction-aware operations
362#[derive(Debug, Clone)]
363pub struct TxnWalEntry {
364 /// Record type
365 pub record_type: WalRecordType,
366 /// Transaction ID
367 pub txn_id: u64,
368 /// Timestamp in microseconds
369 pub timestamp_us: u64,
370 /// Key data
371 pub key: Vec<u8>,
372 /// Value data
373 pub value: Vec<u8>,
374}
375
376impl TxnWalEntry {
377 /// Create a new data entry
378 pub fn data(txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
379 Self {
380 record_type: WalRecordType::Data,
381 txn_id,
382 timestamp_us: Self::now_us(),
383 key,
384 value,
385 }
386 }
387
388 /// Create a transaction begin entry
389 pub fn txn_begin(txn_id: u64) -> Self {
390 Self {
391 record_type: WalRecordType::TxnBegin,
392 txn_id,
393 timestamp_us: Self::now_us(),
394 key: Vec::new(),
395 value: Vec::new(),
396 }
397 }
398
399 /// Create a transaction commit entry
400 pub fn txn_commit(txn_id: u64) -> Self {
401 Self {
402 record_type: WalRecordType::TxnCommit,
403 txn_id,
404 timestamp_us: Self::now_us(),
405 key: Vec::new(),
406 value: Vec::new(),
407 }
408 }
409
410 /// Create a transaction abort entry
411 pub fn txn_abort(txn_id: u64) -> Self {
412 Self {
413 record_type: WalRecordType::TxnAbort,
414 txn_id,
415 timestamp_us: Self::now_us(),
416 key: Vec::new(),
417 value: Vec::new(),
418 }
419 }
420
421 /// Create a checkpoint entry
422 pub fn checkpoint(txn_id: u64) -> Self {
423 Self {
424 record_type: WalRecordType::Checkpoint,
425 txn_id,
426 timestamp_us: Self::now_us(),
427 key: Vec::new(),
428 value: Vec::new(),
429 }
430 }
431
432 /// Create a schema change entry
433 pub fn schema_change(txn_id: u64, schema_data: Vec<u8>) -> Self {
434 Self {
435 record_type: WalRecordType::SchemaChange,
436 txn_id,
437 timestamp_us: Self::now_us(),
438 key: Vec::new(),
439 value: schema_data,
440 }
441 }
442
443 /// Get current time in microseconds (uses cached timestamp)
444 #[inline]
445 fn now_us() -> u64 {
446 cached_timestamp_us()
447 }
448
449 /// Calculate CRC32 checksum for this entry
450 ///
451 /// Uses crc32fast for portable, deterministic checksums.
452 /// The checksum covers all fields except the checksum itself.
453 /// NOTE: This is only used for verification. to_bytes() calculates CRC in single pass.
454 pub fn checksum(&self) -> u32 {
455 let mut hasher = crc32fast::Hasher::new();
456 hasher.update(&[self.record_type as u8]);
457 hasher.update(&self.txn_id.to_le_bytes());
458 hasher.update(&self.timestamp_us.to_le_bytes());
459 hasher.update(&(self.key.len() as u32).to_le_bytes());
460 hasher.update(&(self.value.len() as u32).to_le_bytes());
461 hasher.update(&self.key);
462 hasher.update(&self.value);
463 hasher.finalize()
464 }
465
466 /// Serialize to bytes with single-pass CRC calculation
467 ///
468 /// Optimized to calculate CRC while building the buffer,
469 /// avoiding a second pass over all data.
470 pub fn to_bytes(&self) -> Vec<u8> {
471 let total_len = RECORD_HEADER_SIZE + self.key.len() + self.value.len() + CHECKSUM_SIZE;
472 let mut buf = Vec::with_capacity(total_len);
473 let mut hasher = crc32fast::Hasher::new();
474
475 // Length (not including the length field itself) - not included in CRC
476 let content_len = (total_len - 4) as u32;
477 buf.extend_from_slice(&content_len.to_le_bytes());
478
479 // Record type
480 let record_type_byte = self.record_type as u8;
481 buf.push(record_type_byte);
482 hasher.update(&[record_type_byte]);
483
484 // Transaction ID
485 let txn_bytes = self.txn_id.to_le_bytes();
486 buf.extend_from_slice(&txn_bytes);
487 hasher.update(&txn_bytes);
488
489 // Timestamp
490 let ts_bytes = self.timestamp_us.to_le_bytes();
491 buf.extend_from_slice(&ts_bytes);
492 hasher.update(&ts_bytes);
493
494 // Key length
495 let key_len_bytes = (self.key.len() as u32).to_le_bytes();
496 buf.extend_from_slice(&key_len_bytes);
497 hasher.update(&key_len_bytes);
498
499 // Value length
500 let val_len_bytes = (self.value.len() as u32).to_le_bytes();
501 buf.extend_from_slice(&val_len_bytes);
502 hasher.update(&val_len_bytes);
503
504 // Key data
505 buf.extend_from_slice(&self.key);
506 hasher.update(&self.key);
507
508 // Value data
509 buf.extend_from_slice(&self.value);
510 hasher.update(&self.value);
511
512 // CRC32 Checksum (computed in single pass above)
513 buf.extend_from_slice(&hasher.finalize().to_le_bytes());
514
515 buf
516 }
517
518 /// Deserialize a PLAINTEXT WAL frame from a reader, with torn-write detection.
519 ///
520 /// This is the legacy / unencrypted reader. The live replay path on an
521 /// encrypted WAL does NOT use this — it uses [`TxnWal::read_record`], which
522 /// decrypts first and then calls [`Self::parse_body`]. Keeping this method
523 /// plaintext-only ensures a stray caller can never silently mis-parse
524 /// ciphertext as a plaintext frame.
525 ///
526 /// Returns error if:
527 /// - Length field indicates data is too short or implausibly large
528 /// - CRC32 checksum mismatch (corruption or torn write)
529 /// - Invalid record type
530 pub fn from_reader<R: Read>(reader: &mut R) -> Result<Self> {
531 // Length (allows torn write detection - if length claims more data than exists)
532 let content_len = reader.read_u32::<LittleEndian>()?;
533 if content_len < (RECORD_HEADER_SIZE - 4 + CHECKSUM_SIZE) as u32 {
534 return Err(SochDBError::Corruption("WAL entry too short".into()));
535 }
536 if content_len > MAX_WAL_FRAME_LEN {
537 return Err(SochDBError::Corruption(format!(
538 "WAL entry length {content_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
539 )));
540 }
541
542 // Read the whole body, then parse. A short read here (torn tail) surfaces
543 // as Io(UnexpectedEof), which replay treats as a clean end-of-WAL.
544 let mut body = vec![0u8; content_len as usize];
545 reader.read_exact(&mut body)?;
546 Self::parse_body(&body)
547 }
548
549 /// Parse a record BODY (`type|txn|ts|klen|vlen|key|val|crc32`) and verify its
550 /// CRC. This is the bytes after the `content_len` prefix in a plaintext frame,
551 /// and is exactly what the AEAD envelope protects in an encrypted frame — so
552 /// both the plaintext and decrypt paths share this single parser.
553 pub fn parse_body(body: &[u8]) -> Result<Self> {
554 let mut cur = std::io::Cursor::new(body);
555
556 let record_type_byte = cur.read_u8()?;
557 let record_type = WalRecordType::try_from(record_type_byte).map_err(|_| {
558 SochDBError::Corruption(format!("Invalid record type: {}", record_type_byte))
559 })?;
560
561 let txn_id = cur.read_u64::<LittleEndian>()?;
562 let timestamp_us = cur.read_u64::<LittleEndian>()?;
563 let key_len = cur.read_u32::<LittleEndian>()? as usize;
564 let value_len = cur.read_u32::<LittleEndian>()? as usize;
565
566 let mut key = vec![0u8; key_len];
567 cur.read_exact(&mut key)?;
568 let mut value = vec![0u8; value_len];
569 cur.read_exact(&mut value)?;
570
571 let stored_checksum = cur.read_u32::<LittleEndian>()?;
572
573 let entry = Self {
574 record_type,
575 txn_id,
576 timestamp_us,
577 key,
578 value,
579 };
580
581 // Verify checksum - detects both corruption and torn writes
582 if entry.checksum() != stored_checksum {
583 return Err(SochDBError::Corruption(format!(
584 "WAL checksum mismatch for txn_id {}: expected {}, got {}",
585 txn_id,
586 entry.checksum(),
587 stored_checksum
588 )));
589 }
590
591 Ok(entry)
592 }
593
594 /// The record BODY bytes (`type..crc32`) — i.e. `to_bytes()` without the
595 /// leading 4-byte content_len. This is what gets fed to the AEAD on the
596 /// encrypted write path.
597 fn body_bytes(&self) -> Vec<u8> {
598 let full = self.to_bytes();
599 full[4..].to_vec()
600 }
601}
602
603/// Transaction-aware Write-Ahead Log
604pub struct TxnWal {
605 /// Path to WAL file
606 path: PathBuf,
607 /// Buffered writer
608 writer: Mutex<BufWriter<File>>,
609 /// Next transaction ID
610 next_txn_id: AtomicU64,
611 /// Write sequence number
612 sequence: AtomicU64,
613 /// Bytes written since last sync
614 bytes_since_sync: AtomicU64,
615 /// Cached timestamp (microseconds since epoch)
616 /// Updated periodically to avoid syscall per write
617 cached_timestamp_us: AtomicU64,
618 /// At-rest encryption engine. `disabled()` (identity passthrough) for a
619 /// plaintext WAL — in which case every write/read path is byte-identical to
620 /// the pre-3B format. When enabled, records are written/read as encrypted
621 /// frames (see the framing notes above).
622 encryption: Arc<EncryptionEngine>,
623 /// 16-byte database identity bound into each record's AAD (cross-DB splice
624 /// defense). All-zero / unused when encryption is disabled.
625 db_uuid: [u8; 16],
626 /// Active DEK epoch bound into each record's AAD (downgrade-across-rotation
627 /// defense). 0 / unused when encryption is disabled.
628 dek_epoch: u32,
629 /// File-relative count of records written to the CURRENT WAL file (reset on
630 /// truncate). Used as the per-record AAD ordinal so reorder/duplication
631 /// within a file fails authentication. All writes hold the `writer` lock, so
632 /// this counter is only mutated under that lock; it is read locklessly by the
633 /// (single-threaded) replay paths.
634 records_in_file: AtomicU64,
635}
636
637impl TxnWal {
638 /// Create a new plaintext WAL or open an existing one.
639 ///
640 /// Uses a disabled (passthrough) encryption engine, so the on-disk format is
641 /// byte-identical to the pre-encryption WAL. This is the constructor used by
642 /// every non-live caller (tests, tools); the live durable-storage path uses
643 /// [`Self::new_with_encryption`] to thread the keyring's engine in.
644 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
645 Self::new_with_encryption(path, Arc::new(EncryptionEngine::disabled()), [0u8; 16], 0)
646 }
647
648 /// Create/open a WAL with an explicit at-rest encryption engine.
649 ///
650 /// When `engine` is enabled, records are written and replayed as encrypted
651 /// frames bound to `db_uuid` + `dek_epoch` (see framing notes). When it is
652 /// disabled, this behaves exactly like [`Self::new`].
653 pub fn new_with_encryption<P: AsRef<Path>>(
654 path: P,
655 encryption: Arc<EncryptionEngine>,
656 db_uuid: [u8; 16],
657 dek_epoch: u32,
658 ) -> Result<Self> {
659 let path = path.as_ref().to_path_buf();
660
661 // Ensure parent directory exists
662 if let Some(parent) = path.parent() {
663 std::fs::create_dir_all(parent)?;
664 }
665
666 let file = OpenOptions::new()
667 .create(true)
668 .append(true)
669 .read(true)
670 .open(&path)?;
671
672 // Use 256KB buffer for better batch performance (default is 8KB)
673 // This reduces system calls when buffering many small writes
674 let now_us = cached_timestamp_us();
675
676 let wal = Self {
677 path,
678 writer: Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
679 next_txn_id: AtomicU64::new(1),
680 sequence: AtomicU64::new(0),
681 bytes_since_sync: AtomicU64::new(0),
682 cached_timestamp_us: AtomicU64::new(now_us),
683 encryption,
684 db_uuid,
685 dek_epoch,
686 records_in_file: AtomicU64::new(0),
687 };
688
689 // Recover state from existing WAL
690 wal.recover_state()?;
691
692 Ok(wal)
693 }
694
695 /// Build the per-record AAD for a given file-relative ordinal. The reader
696 /// reconstructs this from its own trusted state, never from on-disk bytes.
697 #[inline]
698 fn record_aad(&self, ordinal: u64) -> [u8; WAL_AAD_LEN] {
699 let mut aad = [0u8; WAL_AAD_LEN];
700 aad[0] = WAL_AAD_VERSION;
701 aad[1..17].copy_from_slice(&self.db_uuid);
702 aad[17..21].copy_from_slice(&self.dek_epoch.to_le_bytes());
703 aad[21..29].copy_from_slice(&ordinal.to_le_bytes());
704 aad
705 }
706
707 /// Frame a single record body for the encrypted write path:
708 /// `[outer_len:u32][version|nonce|ciphertext+tag]`, AAD-bound to `ordinal`.
709 #[inline]
710 fn encrypt_frame(&self, body: &[u8], ordinal: u64) -> Result<Vec<u8>> {
711 let env = self
712 .encryption
713 .encrypt_with_aad(body, &self.record_aad(ordinal))?;
714 let mut out = Vec::with_capacity(4 + env.len());
715 out.extend_from_slice(&(env.len() as u32).to_le_bytes());
716 out.extend_from_slice(&env);
717 Ok(out)
718 }
719
720 /// Read the next record from a replay reader, decrypting if the WAL is
721 /// encrypted. Returns the record and advances `*ordinal`.
722 ///
723 /// Error taxonomy (the linchpin of fail-loud recovery):
724 /// - `Io(UnexpectedEof)` reading the length prefix or a short body read at
725 /// the physical tail ⇒ clean torn-tail / end-of-WAL (callers tolerate).
726 /// - `Encryption(_)` (AEAD auth failure / wrong key / bad envelope) or
727 /// `Corruption(_)` (CRC / bad framing) ⇒ HARD error; callers MUST abort
728 /// recovery rather than treat it as EOF.
729 fn read_record<R: Read>(&self, reader: &mut R, ordinal: &mut u64) -> Result<TxnWalEntry> {
730 if !self.encryption.is_enabled() {
731 let entry = TxnWalEntry::from_reader(reader)?;
732 *ordinal += 1;
733 return Ok(entry);
734 }
735 // Encrypted frame: [outer_len][envelope]
736 let outer_len = reader.read_u32::<LittleEndian>()?; // EOF here = clean end
737 if outer_len > MAX_WAL_FRAME_LEN {
738 return Err(SochDBError::Corruption(format!(
739 "encrypted WAL frame length {outer_len} exceeds maximum {MAX_WAL_FRAME_LEN}"
740 )));
741 }
742 let mut env = vec![0u8; outer_len as usize];
743 reader.read_exact(&mut env)?; // short read = torn tail (UnexpectedEof)
744 // Decrypt with the AAD reconstructed for THIS ordinal. A wrong key, a
745 // reordered/spliced record, or tampering fails here as Encryption(_),
746 // which is a hard error — never a silent EOF.
747 let body = self
748 .encryption
749 .decrypt_with_aad(&env, &self.record_aad(*ordinal))?;
750 let entry = TxnWalEntry::parse_body(&body)?;
751 *ordinal += 1;
752 Ok(entry)
753 }
754
755 /// Recover state (next txn ID, sequence) from existing WAL
756 ///
757 /// To avoid txn_id collisions when multiple processes open the same
758 /// WAL concurrently, we incorporate the PID into the starting txn_id.
759 /// Format: upper 32 bits = PID, lower 32 bits = counter.
760 /// This guarantees uniqueness across processes without coordination.
761 fn recover_state(&self) -> Result<()> {
762 let file = File::open(&self.path)?;
763 let mut reader = BufReader::new(file);
764 let mut count: u64 = 0;
765
766 // Track the max counter (lower 32 bits) for OUR PID only.
767 // Each process owns its own txn_id space: upper 32 bits = PID.
768 // We must NOT use max_txn_id across ALL PIDs, because that would
769 // place us into another PID's ID space and cause collisions.
770 let our_pid = std::process::id() as u64;
771 let pid_base = our_pid << 32;
772 let mut max_our_counter: u64 = 0;
773 let mut ordinal: u64 = 0;
774
775 loop {
776 match self.read_record(&mut reader, &mut ordinal) {
777 Ok(entry) => {
778 count += 1;
779 // Only track counters from entries that belong to our PID
780 let entry_pid = entry.txn_id >> 32;
781 if entry_pid == our_pid {
782 let entry_counter = entry.txn_id & 0xFFFF_FFFF;
783 if entry_counter > max_our_counter {
784 max_our_counter = entry_counter;
785 }
786 }
787 }
788 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
789 break;
790 }
791 Err(e) => {
792 // Encrypted: a non-EOF error (AEAD auth failure / tamper /
793 // corruption) is NEVER a torn tail — fail loud rather than
794 // silently truncate. Plaintext: tolerate trailing corruption
795 // as an incomplete final write (legacy behavior).
796 if self.encryption.is_enabled() {
797 return Err(e);
798 }
799 break;
800 }
801 }
802 }
803
804 // Start from pid_base + (max counter we've seen for our PID + 1).
805 // This ensures:
806 // 1. Unique across processes (different PID → different upper 32 bits)
807 // 2. Unique within this process even if PID is recycled (we skip
808 // over any counters already used by a previous process with our PID)
809 let next_id = pid_base + max_our_counter + 1;
810
811 self.next_txn_id.store(next_id, Ordering::SeqCst);
812 self.sequence.store(count, Ordering::SeqCst);
813 // The current file already holds `count` records, so the next encrypted
814 // write must continue the AAD ordinal sequence from there.
815 self.records_in_file.store(count, Ordering::SeqCst);
816
817 Ok(())
818 }
819
820 /// Get cached timestamp, updating if stale (>1ms old)
821 ///
822 /// This avoids a syscall per write by caching the timestamp.
823 /// For WAL purposes, ~1ms granularity is sufficient.
824 #[inline]
825 fn get_cached_timestamp(&self) -> u64 {
826 // Fast path: use cached value (no syscall)
827 let cached = self.cached_timestamp_us.load(Ordering::Relaxed);
828
829 // Refresh occasionally (every ~1000 writes or when sequence wraps)
830 // This is a very cheap check since sequence is incremented atomically anyway
831 let seq = self.sequence.load(Ordering::Relaxed);
832 if seq & 0x3FF == 0 {
833 // Refresh every 1024 writes
834 let now_us = cached_timestamp_us();
835 self.cached_timestamp_us.store(now_us, Ordering::Relaxed);
836 return now_us;
837 }
838
839 cached
840 }
841
842 /// Append an entry to the WAL
843 ///
844 /// Returns the sequence number of this write.
845 ///
846 /// # Durability Contract
847 ///
848 /// This method writes data to the BufWriter but does **NOT** fsync.
849 /// The data is NOT durable until `flush()` + `sync()` are called.
850 ///
851 /// **For transaction commits, use `commit_transaction()` or `commit_durable()`**
852 /// which enforce fsync. This method is intentionally non-durable for
853 /// batching data writes within a transaction (pre-commit records).
854 ///
855 /// The group commit path (`EventDrivenGroupCommit`) is responsible for
856 /// calling `flush()` + `sync()` after batching multiple commit records.
857 pub fn append(&self, entry: &TxnWalEntry) -> Result<u64> {
858 let mut writer = self.writer.lock();
859 let bytes = self.frame_under_lock(entry)?;
860
861 writer.write_all(&bytes)?;
862 // Don't flush here - BufWriter will batch writes automatically.
863 // Call flush() explicitly before sync() or commit().
864
865 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
866 self.bytes_since_sync
867 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
868
869 Ok(seq)
870 }
871
872 /// Serialize one entry into its on-disk frame, allocating the per-record
873 /// ordinal when encrypted. MUST be called with the `writer` lock held so the
874 /// ordinal matches the order bytes hit the file.
875 #[inline]
876 fn frame_under_lock(&self, entry: &TxnWalEntry) -> Result<Vec<u8>> {
877 if self.encryption.is_enabled() {
878 let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
879 self.encrypt_frame(&entry.body_bytes(), ord)
880 } else {
881 Ok(entry.to_bytes())
882 }
883 }
884
885 /// Append entry without flushing (for batched writes)
886 ///
887 /// Caller must call flush() or sync() afterward to ensure durability.
888 #[inline]
889 pub fn append_no_flush(&self, entry: &TxnWalEntry) -> Result<u64> {
890 let mut writer = self.writer.lock();
891 let bytes = self.frame_under_lock(entry)?;
892
893 writer.write_all(&bytes)?;
894 // No flush - let BufWriter buffer the writes
895
896 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
897 self.bytes_since_sync
898 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
899
900 Ok(seq)
901 }
902
903 /// Write data without flushing (for batched writes within a transaction)
904 #[inline]
905 pub fn write_no_flush(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
906 let entry = TxnWalEntry::data(txn_id, key, value);
907 self.append_no_flush(&entry)
908 }
909
910 /// Write data from slices without any allocation
911 ///
912 /// This is the fastest path for writing - no intermediate Vec allocations.
913 /// Calculates CRC32 while serializing directly to BufWriter.
914 #[inline]
915 pub fn write_no_flush_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<u64> {
916 // Use coarse timestamp (cached every ~1ms) instead of syscall per write
917 let timestamp_us = self.get_cached_timestamp();
918
919 let total_len = RECORD_HEADER_SIZE + key.len() + value.len() + CHECKSUM_SIZE;
920
921 let mut writer = self.writer.lock();
922
923 // Encrypted mode cannot stream field-by-field (the AEAD needs the whole
924 // body to compute its synthetic IV + tag), so materialize the body into a
925 // scratch buffer, seal it, and write one framed envelope. The plaintext
926 // path below keeps its zero-alloc streaming fast path untouched.
927 if self.encryption.is_enabled() {
928 let body = encode_record_body(WalRecordType::Data, txn_id, timestamp_us, key, value);
929 let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
930 let frame = self.encrypt_frame(&body, ord)?;
931 writer.write_all(&frame)?;
932 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
933 self.bytes_since_sync
934 .fetch_add(frame.len() as u64, Ordering::Relaxed);
935 return Ok(seq);
936 }
937
938 let mut hasher = crc32fast::Hasher::new();
939
940 // Length (not included in CRC)
941 let content_len = (total_len - 4) as u32;
942 writer.write_all(&content_len.to_le_bytes())?;
943
944 // Record type
945 let record_type_byte = WalRecordType::Data as u8;
946 writer.write_all(&[record_type_byte])?;
947 hasher.update(&[record_type_byte]);
948
949 // Transaction ID
950 let txn_bytes = txn_id.to_le_bytes();
951 writer.write_all(&txn_bytes)?;
952 hasher.update(&txn_bytes);
953
954 // Timestamp
955 let ts_bytes = timestamp_us.to_le_bytes();
956 writer.write_all(&ts_bytes)?;
957 hasher.update(&ts_bytes);
958
959 // Key length
960 let key_len_bytes = (key.len() as u32).to_le_bytes();
961 writer.write_all(&key_len_bytes)?;
962 hasher.update(&key_len_bytes);
963
964 // Value length
965 let val_len_bytes = (value.len() as u32).to_le_bytes();
966 writer.write_all(&val_len_bytes)?;
967 hasher.update(&val_len_bytes);
968
969 // Key data
970 writer.write_all(key)?;
971 hasher.update(key);
972
973 // Value data
974 writer.write_all(value)?;
975 hasher.update(value);
976
977 // CRC32 checksum
978 writer.write_all(&hasher.finalize().to_le_bytes())?;
979
980 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
981 self.bytes_since_sync
982 .fetch_add(total_len as u64, Ordering::Relaxed);
983
984 Ok(seq)
985 }
986
987 /// Flush pending writes to kernel buffer (but not to disk)
988 pub fn flush(&self) -> Result<()> {
989 let mut writer = self.writer.lock();
990 writer.flush()?;
991 Ok(())
992 }
993
994 /// Append and immediately sync for durability
995 pub fn append_sync(&self, entry: &TxnWalEntry) -> Result<u64> {
996 let seq = self.append(entry)?;
997 self.sync()?;
998 Ok(seq)
999 }
1000
1001 /// Force sync to disk.
1002 ///
1003 /// Must flush the BufWriter BEFORE fsync: `get_ref()` reaches the raw File
1004 /// and bypasses the 256 KB buffer, so without an explicit `flush()` the
1005 /// just-appended commit/checkpoint record may still sit in userspace and
1006 /// `sync_all()` would fsync stale bytes — silently breaking the durability
1007 /// guarantee `append_sync`/`commit`/`checkpoint` advertise.
1008 pub fn sync(&self) -> Result<()> {
1009 let mut writer = self.writer.lock();
1010 writer.flush()?;
1011 writer.get_ref().sync_all()?;
1012 self.bytes_since_sync.store(0, Ordering::Relaxed);
1013 Ok(())
1014 }
1015
1016 /// Flush a TxnWalBuffer with single lock acquisition
1017 ///
1018 /// This is the high-performance path for transaction commit:
1019 /// - All writes during the transaction are buffered locally
1020 /// - At commit, this method flushes everything with ONE lock
1021 ///
1022 /// ## Performance
1023 ///
1024 /// ```text
1025 /// 1000 writes with individual flush: 1000 × lock overhead
1026 /// 1000 writes with buffer + flush_buffer: 1 × lock overhead
1027 /// Speedup: ~1000× for lock overhead
1028 /// ```
1029 #[inline]
1030 pub fn flush_buffer(&self, buffer: &TxnWalBuffer) -> Result<u64> {
1031 if buffer.is_empty() {
1032 return Ok(0);
1033 }
1034
1035 let mut writer = self.writer.lock();
1036
1037 if self.encryption.is_enabled() {
1038 // The buffer holds concatenated plaintext frames
1039 // `[content_len][body]...`. Re-frame each as an AEAD envelope bound to
1040 // its own file-relative ordinal, preserving per-record framing (so a
1041 // torn tail discards exactly one record).
1042 let buf = &buffer.buffer;
1043 let mut pos = 0usize;
1044 let mut total_written = 0u64;
1045 while pos + 4 <= buf.len() {
1046 let content_len =
1047 u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
1048 as usize;
1049 pos += 4;
1050 if pos + content_len > buf.len() {
1051 return Err(SochDBError::Corruption(
1052 "txn buffer truncated mid-record during encrypted flush".into(),
1053 ));
1054 }
1055 let body = &buf[pos..pos + content_len];
1056 pos += content_len;
1057 let ord = self.records_in_file.fetch_add(1, Ordering::SeqCst);
1058 let frame = self.encrypt_frame(body, ord)?;
1059 writer.write_all(&frame)?;
1060 total_written += frame.len() as u64;
1061 }
1062 let seq = self
1063 .sequence
1064 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1065 self.bytes_since_sync
1066 .fetch_add(total_written, Ordering::Relaxed);
1067 return Ok(seq);
1068 }
1069
1070 writer.write_all(&buffer.buffer)?;
1071
1072 let seq = self
1073 .sequence
1074 .fetch_add(buffer.entry_count as u64, Ordering::SeqCst);
1075 self.bytes_since_sync
1076 .fetch_add(buffer.buffer.len() as u64, Ordering::Relaxed);
1077
1078 Ok(seq)
1079 }
1080
1081 /// Get the current size of the WAL file in bytes
1082 pub fn size_bytes(&self) -> u64 {
1083 std::fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0)
1084 }
1085
1086 /// Allocate a new transaction ID
1087 pub fn alloc_txn_id(&self) -> u64 {
1088 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1089 }
1090
1091 /// Begin a new transaction
1092 pub fn begin_transaction(&self) -> Result<u64> {
1093 let txn_id = self.alloc_txn_id();
1094 let entry = TxnWalEntry::txn_begin(txn_id);
1095 self.append(&entry)?;
1096 Ok(txn_id)
1097 }
1098
1099 /// Commit a transaction (with fsync for durability)
1100 ///
1101 /// This flushes all pending writes and then fsyncs the commit record.
1102 ///
1103 /// # Durability Guarantee
1104 ///
1105 /// After this method returns `Ok(())`, the commit record is durable on disk.
1106 /// The transaction is guaranteed to survive process crash, OS crash, and
1107 /// power failure (assuming the storage device honors fsync correctly).
1108 ///
1109 /// # Performance
1110 ///
1111 /// Each call performs an fsync (~5ms on HDD, ~0.1ms on NVMe).
1112 /// For high-throughput workloads, use `EventDrivenGroupCommit` which
1113 /// batches multiple commits into a single fsync via `commit_durable_batch()`.
1114 pub fn commit_transaction(&self, txn_id: u64) -> Result<()> {
1115 // First flush any pending buffered writes
1116 self.flush()?;
1117
1118 // Then write commit record with fsync
1119 let entry = TxnWalEntry::txn_commit(txn_id);
1120 self.append_sync(&entry)?;
1121 Ok(())
1122 }
1123
1124 /// Commit a batch of transactions with a single fsync (group commit).
1125 ///
1126 /// Writes commit records for all transaction IDs, then performs a single
1127 /// flush + fsync. This amortizes the fsync cost across N transactions,
1128 /// achieving ~N× throughput improvement over individual commits.
1129 ///
1130 /// # Durability Guarantee
1131 ///
1132 /// After this method returns `Ok(())`, ALL transactions in the batch
1133 /// are durable on disk. Either all commit records are visible after
1134 /// crash recovery, or none are (atomic batch durability).
1135 ///
1136 /// # Usage
1137 ///
1138 /// This method is called by `EventDrivenGroupCommit::flush_fn` to
1139 /// implement the group commit pattern. Do not call directly unless
1140 /// you are implementing your own commit batching.
1141 pub fn commit_durable_batch(&self, txn_ids: &[u64]) -> Result<()> {
1142 // Write all commit records without flushing (batch them in BufWriter)
1143 for &txn_id in txn_ids {
1144 let entry = TxnWalEntry::txn_commit(txn_id);
1145 self.append_no_flush(&entry)?;
1146 }
1147
1148 // Single flush + fsync for the entire batch
1149 self.flush()?;
1150 self.sync()?;
1151 Ok(())
1152 }
1153
1154 /// Abort a transaction
1155 pub fn abort_transaction(&self, txn_id: u64) -> Result<()> {
1156 let entry = TxnWalEntry::txn_abort(txn_id);
1157 self.append(&entry)?;
1158 Ok(())
1159 }
1160
1161 /// Write data within a transaction
1162 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1163 let entry = TxnWalEntry::data(txn_id, key, value);
1164 self.append(&entry)
1165 }
1166
1167 /// Replay WAL for crash recovery
1168 ///
1169 /// Returns (committed_writes, recovered_txn_count)
1170 #[allow(clippy::type_complexity)]
1171 pub fn replay_for_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, usize)> {
1172 let file = File::open(&self.path)?;
1173 let mut reader = BufReader::new(file);
1174
1175 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1176 std::collections::HashMap::new();
1177 let mut result = Vec::new();
1178 let mut txn_count = 0;
1179 let mut ordinal: u64 = 0;
1180
1181 // Single pass in WAL order. Transaction ids are process-local and
1182 // short-lived CLI processes can reuse the same ids after restart, so
1183 // grouping the entire WAL by txn_id would merge unrelated transactions
1184 // and replay older writes after newer ones.
1185 loop {
1186 match self.read_record(&mut reader, &mut ordinal) {
1187 Ok(entry) => match entry.record_type {
1188 WalRecordType::TxnBegin => {
1189 pending_writes.insert(entry.txn_id, Vec::new());
1190 }
1191 WalRecordType::Data => {
1192 // Accept data for any txn_id we've seen a Begin for,
1193 // and also for txn_ids without a Begin (they might have
1194 // their Begin later in the WAL due to buffered writes).
1195 pending_writes
1196 .entry(entry.txn_id)
1197 .or_insert_with(Vec::new)
1198 .push((entry.key, entry.value));
1199 }
1200 WalRecordType::TxnCommit => {
1201 if let Some(writes) = pending_writes.remove(&entry.txn_id) {
1202 result.extend(writes);
1203 txn_count += 1;
1204 }
1205 }
1206 WalRecordType::TxnAbort => {
1207 pending_writes.remove(&entry.txn_id);
1208 }
1209 _ => {}
1210 },
1211 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1212 break;
1213 }
1214 Err(e) => {
1215 // Encrypted WALs must fail loud on a non-EOF error so a wrong
1216 // key / tamper / corruption can never masquerade as EOF and
1217 // silently drop committed data. (Wrong key is already excluded
1218 // earlier by the keyring canary, so this is genuine corruption
1219 // or tampering.) Plaintext keeps the legacy torn-tail tolerance.
1220 if self.encryption.is_enabled() {
1221 return Err(e);
1222 }
1223 break;
1224 }
1225 }
1226 }
1227
1228 Ok((result, txn_count))
1229 }
1230
1231 /// Replay WAL with a callback
1232 pub fn replay<F>(&self, mut callback: F) -> Result<u64>
1233 where
1234 F: FnMut(TxnWalEntry) -> Result<()>,
1235 {
1236 let file = File::open(&self.path)?;
1237 let mut reader = BufReader::new(file);
1238 let mut count = 0u64;
1239 let mut ordinal: u64 = 0;
1240
1241 loop {
1242 match self.read_record(&mut reader, &mut ordinal) {
1243 Ok(entry) => {
1244 callback(entry)?;
1245 count += 1;
1246 }
1247 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1248 break;
1249 }
1250 Err(e) => {
1251 // Encrypted: fail loud (never silently drop committed data on a
1252 // non-EOF error). Plaintext: tolerate a trailing incomplete entry.
1253 if self.encryption.is_enabled() {
1254 return Err(e);
1255 }
1256 eprintln!("WAL replay warning: {:?}", e);
1257 break;
1258 }
1259 }
1260 }
1261
1262 Ok(count)
1263 }
1264
1265 /// Truncate WAL (called after successful checkpoint)
1266 ///
1267 /// Flushes any buffered writes, truncates the file to 0 bytes,
1268 /// and resets sequence counters. The file is opened in `O_APPEND`
1269 /// mode so subsequent writes will correctly start at offset 0.
1270 ///
1271 /// **WARNING**: After truncation, all data durability is lost.
1272 /// The in-memory memtable still holds data for the current session,
1273 /// but a crash after truncation means the data cannot be recovered
1274 /// from the WAL.
1275 pub fn truncate(&self) -> Result<()> {
1276 let mut writer = self.writer.lock();
1277 // Flush BufWriter so no stale data is written after truncation
1278 writer.flush()?;
1279 let file = writer.get_ref();
1280 file.set_len(0)?;
1281 file.sync_all()?;
1282 self.sequence.store(0, Ordering::SeqCst);
1283 self.bytes_since_sync.store(0, Ordering::Relaxed);
1284 // The file restarts at offset 0, so per-record AAD ordinals restart too;
1285 // a fresh reader of the truncated file will count from 0 to match.
1286 self.records_in_file.store(0, Ordering::SeqCst);
1287 Ok(())
1288 }
1289
1290 /// Write a checkpoint marker
1291 pub fn write_checkpoint(&self) -> Result<u64> {
1292 let entry = TxnWalEntry::checkpoint(0);
1293 self.append_sync(&entry)
1294 }
1295
1296 /// Write a Compensation Log Record (CLR) for undo operations
1297 ///
1298 /// CLRs are redo-only records that skip past already undone operations
1299 /// during recovery. The undo_next_lsn field tells recovery where to
1300 /// continue undoing after this CLR.
1301 pub fn append_clr(
1302 &self,
1303 txn_id: u64,
1304 _original_lsn: u64,
1305 undo_next_lsn: Option<u64>,
1306 undo_data: &[u8],
1307 ) -> Result<u64> {
1308 // Encode undo_next_lsn into the key field
1309 let key = undo_next_lsn.unwrap_or(0).to_le_bytes().to_vec();
1310 let entry = TxnWalEntry {
1311 record_type: WalRecordType::CompensationLogRecord,
1312 txn_id,
1313 timestamp_us: TxnWalEntry::now_us(),
1314 key, // undo_next_lsn encoded in key
1315 value: undo_data.to_vec(),
1316 };
1317 self.append(&entry)
1318 }
1319
1320 /// Write checkpoint with data (for fuzzy checkpoints)
1321 pub fn write_checkpoint_with_data(&self, checkpoint_data: &[u8]) -> Result<u64> {
1322 let entry = TxnWalEntry {
1323 record_type: WalRecordType::Checkpoint,
1324 txn_id: 0,
1325 timestamp_us: TxnWalEntry::now_us(),
1326 key: Vec::new(),
1327 value: checkpoint_data.to_vec(),
1328 };
1329 self.append_sync(&entry)
1330 }
1331
1332 /// Write checkpoint end with captured state
1333 pub fn write_checkpoint_end(&self, checkpoint_data: &[u8]) -> Result<u64> {
1334 let entry = TxnWalEntry {
1335 record_type: WalRecordType::CheckpointEnd,
1336 txn_id: 0,
1337 timestamp_us: TxnWalEntry::now_us(),
1338 key: Vec::new(),
1339 value: checkpoint_data.to_vec(),
1340 };
1341 self.append_sync(&entry)
1342 }
1343
1344 /// Get current sequence number
1345 pub fn sequence(&self) -> u64 {
1346 self.sequence.load(Ordering::SeqCst)
1347 }
1348
1349 /// Get bytes written since last sync
1350 pub fn bytes_since_sync(&self) -> u64 {
1351 self.bytes_since_sync.load(Ordering::Relaxed)
1352 }
1353
1354 /// Get path to WAL file
1355 pub fn path(&self) -> &Path {
1356 &self.path
1357 }
1358}
1359
1360/// Statistics about WAL state
1361#[derive(Debug, Clone, Default)]
1362pub struct TxnWalStats {
1363 /// Number of entries written
1364 pub entries_written: u64,
1365 /// Bytes written since last sync
1366 pub bytes_since_sync: u64,
1367 /// Current transaction ID counter
1368 pub next_txn_id: u64,
1369}
1370
1371// ============================================================================
1372// Sharded WAL for Reduced Mutex Contention
1373// ============================================================================
1374
1375/// Sharded Write-Ahead Log for high-concurrency workloads
1376///
1377/// Instead of a single Mutex<File>, uses multiple shards to reduce contention:
1378/// - Writers hash to shard by txn_id
1379/// - Each shard has its own buffer
1380/// - Central coordinator handles fsync ordering
1381///
1382/// Reduces contention from O(1) bottleneck to O(num_shards) parallelism.
1383#[allow(dead_code)]
1384pub struct ShardedWal {
1385 /// Shard writers (txn_id % num_shards selects shard)
1386 shards: Vec<parking_lot::Mutex<WalShard>>,
1387 /// Number of shards (power of 2)
1388 num_shards: usize,
1389 /// Central WAL file for ordered writes
1390 central_writer: parking_lot::Mutex<BufWriter<File>>,
1391 /// Next transaction ID
1392 next_txn_id: AtomicU64,
1393 /// Write sequence (global ordering)
1394 sequence: AtomicU64,
1395 /// Path
1396 path: PathBuf,
1397}
1398
1399/// Individual WAL shard buffer
1400struct WalShard {
1401 /// Buffered entries for this shard
1402 buffer: Vec<u8>,
1403 /// Number of entries buffered
1404 entry_count: usize,
1405}
1406
1407impl WalShard {
1408 fn new() -> Self {
1409 Self {
1410 buffer: Vec::with_capacity(64 * 1024), // 64KB per shard
1411 entry_count: 0,
1412 }
1413 }
1414
1415 fn append(&mut self, entry: &TxnWalEntry) {
1416 let bytes = entry.to_bytes();
1417 self.buffer.extend_from_slice(&bytes);
1418 self.entry_count += 1;
1419 }
1420
1421 fn is_empty(&self) -> bool {
1422 self.buffer.is_empty()
1423 }
1424
1425 fn drain(&mut self) -> Vec<u8> {
1426 self.entry_count = 0;
1427 std::mem::take(&mut self.buffer)
1428 }
1429}
1430
1431impl ShardedWal {
1432 /// Create sharded WAL with specified number of shards
1433 ///
1434 /// Recommended: 4-8 shards for typical server workloads
1435 pub fn new<P: AsRef<Path>>(path: P, num_shards: usize) -> Result<Self> {
1436 let path = path.as_ref().to_path_buf();
1437
1438 if let Some(parent) = path.parent() {
1439 std::fs::create_dir_all(parent)?;
1440 }
1441
1442 let file = std::fs::OpenOptions::new()
1443 .create(true)
1444 .append(true)
1445 .read(true)
1446 .open(&path)?;
1447
1448 // Round up to power of 2 for fast modulo
1449 let num_shards = num_shards.next_power_of_two();
1450 let shards: Vec<_> = (0..num_shards)
1451 .map(|_| parking_lot::Mutex::new(WalShard::new()))
1452 .collect();
1453
1454 Ok(Self {
1455 shards,
1456 num_shards,
1457 central_writer: parking_lot::Mutex::new(BufWriter::with_capacity(256 * 1024, file)),
1458 next_txn_id: AtomicU64::new(1),
1459 sequence: AtomicU64::new(0),
1460 path,
1461 })
1462 }
1463
1464 /// Get shard index for transaction
1465 #[inline]
1466 fn shard_idx(&self, txn_id: u64) -> usize {
1467 (txn_id as usize) & (self.num_shards - 1)
1468 }
1469
1470 /// Append entry to appropriate shard (lock-free for different txns)
1471 pub fn append(&self, entry: &TxnWalEntry) -> u64 {
1472 let shard_idx = self.shard_idx(entry.txn_id);
1473 let mut shard = self.shards[shard_idx].lock();
1474 shard.append(entry);
1475 self.sequence.fetch_add(1, Ordering::SeqCst)
1476 }
1477
1478 /// Allocate transaction ID
1479 pub fn alloc_txn_id(&self) -> u64 {
1480 self.next_txn_id.fetch_add(1, Ordering::SeqCst)
1481 }
1482
1483 /// Flush all shard buffers to central file
1484 pub fn flush(&self) -> Result<()> {
1485 let mut central = self.central_writer.lock();
1486
1487 // Collect all shard buffers (brief lock per shard)
1488 for shard in &self.shards {
1489 let mut shard_guard = shard.lock();
1490 if !shard_guard.is_empty() {
1491 let data = shard_guard.drain();
1492 central.write_all(&data)?;
1493 }
1494 }
1495
1496 central.flush()?;
1497 Ok(())
1498 }
1499
1500 /// Sync to disk (fsync)
1501 pub fn sync(&self) -> Result<()> {
1502 self.flush()?;
1503 let central = self.central_writer.lock();
1504 central.get_ref().sync_all()?;
1505 Ok(())
1506 }
1507
1508 /// Begin transaction
1509 pub fn begin_transaction(&self) -> Result<u64> {
1510 let txn_id = self.alloc_txn_id();
1511 let entry = TxnWalEntry::txn_begin(txn_id);
1512 self.append(&entry);
1513 Ok(txn_id)
1514 }
1515
1516 /// Write data
1517 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<u64> {
1518 let entry = TxnWalEntry::data(txn_id, key, value);
1519 Ok(self.append(&entry))
1520 }
1521
1522 /// Commit transaction
1523 pub fn commit_transaction(&self, txn_id: u64) -> Result<u64> {
1524 let entry = TxnWalEntry::txn_commit(txn_id);
1525 let seq = self.append(&entry);
1526 self.sync()?; // Fsync on commit for durability
1527 Ok(seq)
1528 }
1529
1530 /// Get statistics
1531 pub fn stats(&self) -> ShardedWalStats {
1532 let mut shard_entry_counts = Vec::with_capacity(self.num_shards);
1533 for shard in &self.shards {
1534 shard_entry_counts.push(shard.lock().entry_count);
1535 }
1536
1537 ShardedWalStats {
1538 num_shards: self.num_shards,
1539 total_entries: self.sequence.load(Ordering::SeqCst),
1540 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1541 shard_entry_counts,
1542 }
1543 }
1544}
1545
1546/// Statistics for sharded WAL
1547#[derive(Debug, Clone)]
1548pub struct ShardedWalStats {
1549 pub num_shards: usize,
1550 pub total_entries: u64,
1551 pub next_txn_id: u64,
1552 pub shard_entry_counts: Vec<usize>,
1553}
1554
1555/// Detailed crash recovery statistics
1556#[derive(Debug, Clone, Default)]
1557pub struct CrashRecoveryStats {
1558 /// Total records read from WAL
1559 pub total_records: u64,
1560 /// Number of committed transactions
1561 pub committed_txns: u64,
1562 /// Number of uncommitted (rolled back) transactions
1563 pub rolled_back_txns: u64,
1564 /// Number of explicitly aborted transactions
1565 pub aborted_txns: u64,
1566 /// Number of data writes recovered
1567 pub recovered_writes: u64,
1568 /// Number of torn/corrupted records at end (expected on crash)
1569 pub torn_records: u64,
1570 /// Bytes read from WAL
1571 pub bytes_read: u64,
1572 /// Recovery duration in microseconds
1573 pub recovery_duration_us: u64,
1574 /// Highest transaction ID seen (for restarting counter)
1575 pub max_txn_id: u64,
1576}
1577
1578impl TxnWal {
1579 /// Get WAL statistics
1580 pub fn stats(&self) -> TxnWalStats {
1581 TxnWalStats {
1582 entries_written: self.sequence.load(Ordering::SeqCst),
1583 bytes_since_sync: self.bytes_since_sync.load(Ordering::Relaxed),
1584 next_txn_id: self.next_txn_id.load(Ordering::SeqCst),
1585 }
1586 }
1587
1588 /// Full crash recovery with detailed statistics
1589 ///
1590 /// This method provides ACID recovery guarantees:
1591 /// 1. **Atomicity**: Uncommitted transactions are rolled back
1592 /// 2. **Durability**: All committed transactions are replayed
1593 /// 3. **Torn Write Detection**: Partial records at EOF are detected via CRC32
1594 ///
1595 /// Returns (committed_writes, stats) where committed_writes contains
1596 /// all key-value pairs from committed transactions in order.
1597 #[allow(clippy::type_complexity)]
1598 pub fn crash_recovery(&self) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, CrashRecoveryStats)> {
1599 let start_time = std::time::Instant::now();
1600 let file = File::open(&self.path)?;
1601 let file_size = file.metadata()?.len();
1602 let mut reader = BufReader::new(file);
1603
1604 let mut stats = CrashRecoveryStats {
1605 bytes_read: file_size,
1606 ..Default::default()
1607 };
1608
1609 let mut committed_txns: HashSet<u64> = HashSet::new();
1610 let mut aborted_txns: HashSet<u64> = HashSet::new();
1611 let mut pending_writes: std::collections::HashMap<u64, Vec<(Vec<u8>, Vec<u8>)>> =
1612 std::collections::HashMap::new();
1613 let mut all_txns: HashSet<u64> = HashSet::new();
1614
1615 // Read all records, stopping at first corruption (torn write)
1616 let mut ordinal: u64 = 0;
1617 loop {
1618 match self.read_record(&mut reader, &mut ordinal) {
1619 Ok(entry) => {
1620 stats.total_records += 1;
1621 if entry.txn_id > stats.max_txn_id {
1622 stats.max_txn_id = entry.txn_id;
1623 }
1624
1625 match entry.record_type {
1626 WalRecordType::TxnBegin => {
1627 pending_writes.insert(entry.txn_id, Vec::new());
1628 all_txns.insert(entry.txn_id);
1629 }
1630 WalRecordType::Data => {
1631 if let Some(writes) = pending_writes.get_mut(&entry.txn_id) {
1632 writes.push((entry.key, entry.value));
1633 }
1634 }
1635 WalRecordType::TxnCommit => {
1636 committed_txns.insert(entry.txn_id);
1637 }
1638 WalRecordType::TxnAbort => {
1639 pending_writes.remove(&entry.txn_id);
1640 aborted_txns.insert(entry.txn_id);
1641 }
1642 _ => {}
1643 }
1644 }
1645 Err(SochDBError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
1646 // Clean EOF
1647 break;
1648 }
1649 Err(e) => {
1650 // Encrypted: an AEAD auth failure / tamper / corruption is a
1651 // hard error — abort recovery rather than silently truncating
1652 // committed data (wrong key is already caught by the keyring
1653 // canary at open). Plaintext: treat trailing corruption as a
1654 // torn write and stop.
1655 if self.encryption.is_enabled() {
1656 return Err(e);
1657 }
1658 stats.torn_records += 1;
1659 break;
1660 }
1661 }
1662 }
1663
1664 // Collect committed writes
1665 let mut result = Vec::new();
1666 for (txn_id, writes) in &pending_writes {
1667 if committed_txns.contains(txn_id) {
1668 stats.committed_txns += 1;
1669 stats.recovered_writes += writes.len() as u64;
1670 result.extend(writes.clone());
1671 }
1672 }
1673
1674 // Count aborted and rolled-back transactions
1675 stats.aborted_txns = aborted_txns.len() as u64;
1676 stats.rolled_back_txns = all_txns.len() as u64 - stats.committed_txns - stats.aborted_txns;
1677
1678 stats.recovery_duration_us = start_time.elapsed().as_micros() as u64;
1679
1680 Ok((result, stats))
1681 }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686 use super::*;
1687 use tempfile::tempdir;
1688
1689 #[test]
1690 fn test_wal_entry_roundtrip() {
1691 let entry = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1692 let bytes = entry.to_bytes();
1693
1694 let mut cursor = std::io::Cursor::new(bytes);
1695 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1696
1697 assert_eq!(recovered.record_type, WalRecordType::Data);
1698 assert_eq!(recovered.txn_id, 42);
1699 assert_eq!(recovered.key, b"key");
1700 assert_eq!(recovered.value, b"value");
1701 }
1702
1703 #[test]
1704 fn test_wal_append_and_replay() {
1705 let dir = tempdir().unwrap();
1706 let wal_path = dir.path().join("test.wal");
1707
1708 // Write some entries
1709 {
1710 let wal = TxnWal::new(&wal_path).unwrap();
1711 let txn_id = wal.begin_transaction().unwrap();
1712 wal.write(txn_id, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1713 wal.write(txn_id, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1714 wal.commit_transaction(txn_id).unwrap();
1715 }
1716
1717 // Replay and verify
1718 {
1719 let wal = TxnWal::new(&wal_path).unwrap();
1720 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1721
1722 assert_eq!(txn_count, 1);
1723 assert_eq!(writes.len(), 2);
1724 assert_eq!(writes[0], (b"k1".to_vec(), b"v1".to_vec()));
1725 assert_eq!(writes[1], (b"k2".to_vec(), b"v2".to_vec()));
1726 }
1727 }
1728
1729 #[test]
1730 fn test_uncommitted_transaction_rollback() {
1731 let dir = tempdir().unwrap();
1732 let wal_path = dir.path().join("test.wal");
1733
1734 // Write committed and uncommitted transactions
1735 {
1736 let wal = TxnWal::new(&wal_path).unwrap();
1737
1738 // Committed transaction
1739 let txn1 = wal.begin_transaction().unwrap();
1740 wal.write(txn1, b"committed".to_vec(), b"yes".to_vec())
1741 .unwrap();
1742 wal.commit_transaction(txn1).unwrap();
1743
1744 // Uncommitted transaction (simulates crash)
1745 let txn2 = wal.begin_transaction().unwrap();
1746 wal.write(txn2, b"uncommitted".to_vec(), b"no".to_vec())
1747 .unwrap();
1748 // No commit!
1749 }
1750
1751 // Replay - uncommitted should be rolled back
1752 {
1753 let wal = TxnWal::new(&wal_path).unwrap();
1754 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1755
1756 assert_eq!(txn_count, 1); // Only committed transaction
1757 assert_eq!(writes.len(), 1);
1758 assert_eq!(writes[0], (b"committed".to_vec(), b"yes".to_vec()));
1759 }
1760 }
1761
1762 #[test]
1763 fn test_aborted_transaction() {
1764 let dir = tempdir().unwrap();
1765 let wal_path = dir.path().join("test.wal");
1766
1767 {
1768 let wal = TxnWal::new(&wal_path).unwrap();
1769
1770 let txn = wal.begin_transaction().unwrap();
1771 wal.write(txn, b"aborted".to_vec(), b"data".to_vec())
1772 .unwrap();
1773 wal.abort_transaction(txn).unwrap();
1774 }
1775
1776 {
1777 let wal = TxnWal::new(&wal_path).unwrap();
1778 let (writes, txn_count) = wal.replay_for_recovery().unwrap();
1779
1780 assert_eq!(txn_count, 0);
1781 assert!(writes.is_empty());
1782 }
1783 }
1784
1785 #[test]
1786 fn test_checksum_validation() {
1787 let entry = TxnWalEntry::data(1, b"key".to_vec(), b"value".to_vec());
1788 let mut bytes = entry.to_bytes();
1789
1790 // Corrupt the checksum
1791 let len = bytes.len();
1792 bytes[len - 1] ^= 0xFF;
1793
1794 let mut cursor = std::io::Cursor::new(bytes);
1795 let result = TxnWalEntry::from_reader(&mut cursor);
1796
1797 assert!(result.is_err());
1798 }
1799
1800 #[test]
1801 fn test_crash_recovery_with_stats() {
1802 let dir = tempdir().unwrap();
1803 let wal_path = dir.path().join("test.wal");
1804
1805 // Simulate complex workload
1806 {
1807 let wal = TxnWal::new(&wal_path).unwrap();
1808
1809 // Committed transaction 1
1810 let txn1 = wal.begin_transaction().unwrap();
1811 wal.write(txn1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1812 wal.write(txn1, b"k2".to_vec(), b"v2".to_vec()).unwrap();
1813 wal.commit_transaction(txn1).unwrap();
1814
1815 // Aborted transaction
1816 let txn2 = wal.begin_transaction().unwrap();
1817 wal.write(txn2, b"aborted_key".to_vec(), b"aborted_val".to_vec())
1818 .unwrap();
1819 wal.abort_transaction(txn2).unwrap();
1820
1821 // Committed transaction 2
1822 let txn3 = wal.begin_transaction().unwrap();
1823 wal.write(txn3, b"k3".to_vec(), b"v3".to_vec()).unwrap();
1824 wal.commit_transaction(txn3).unwrap();
1825
1826 // Uncommitted transaction (simulates crash)
1827 let txn4 = wal.begin_transaction().unwrap();
1828 wal.write(txn4, b"uncommitted".to_vec(), b"data".to_vec())
1829 .unwrap();
1830 // No commit - simulates crash
1831 }
1832
1833 // Recover and verify
1834 {
1835 let wal = TxnWal::new(&wal_path).unwrap();
1836 let (writes, stats) = wal.crash_recovery().unwrap();
1837
1838 // Should have 3 writes from 2 committed transactions
1839 assert_eq!(writes.len(), 3);
1840 assert_eq!(stats.committed_txns, 2);
1841 assert_eq!(stats.aborted_txns, 1);
1842 assert_eq!(stats.rolled_back_txns, 1); // txn4 was uncommitted
1843 assert_eq!(stats.recovered_writes, 3);
1844 assert!(stats.recovery_duration_us > 0);
1845 }
1846 }
1847
1848 #[test]
1849 fn test_torn_write_detection() {
1850 let dir = tempdir().unwrap();
1851 let wal_path = dir.path().join("test.wal");
1852
1853 // Write a valid transaction
1854 {
1855 let wal = TxnWal::new(&wal_path).unwrap();
1856 let txn = wal.begin_transaction().unwrap();
1857 wal.write(txn, b"key".to_vec(), b"value".to_vec()).unwrap();
1858 wal.commit_transaction(txn).unwrap();
1859 }
1860
1861 // Append corrupted bytes to simulate torn write
1862 {
1863 use std::io::Write;
1864 let mut file = std::fs::OpenOptions::new()
1865 .append(true)
1866 .open(&wal_path)
1867 .unwrap();
1868 // Write partial record (torn write)
1869 file.write_all(&[0x10, 0x00, 0x00, 0x00, 0xFF, 0xFF])
1870 .unwrap();
1871 }
1872
1873 // Recovery should still work, detecting torn write
1874 {
1875 let wal = TxnWal::new(&wal_path).unwrap();
1876 let (writes, stats) = wal.crash_recovery().unwrap();
1877
1878 // Should recover the valid transaction
1879 assert_eq!(writes.len(), 1);
1880 assert_eq!(stats.committed_txns, 1);
1881 assert_eq!(stats.torn_records, 1);
1882 }
1883 }
1884
1885 #[test]
1886 fn test_crc32_determinism() {
1887 // Verify CRC32 produces consistent checksums for same content
1888 let mut entry1 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1889 entry1.timestamp_us = 12345; // Fixed timestamp for determinism
1890
1891 let mut entry2 = TxnWalEntry::data(42, b"key".to_vec(), b"value".to_vec());
1892 entry2.timestamp_us = 12345; // Same timestamp
1893
1894 assert_eq!(entry1.checksum(), entry2.checksum());
1895
1896 // Different content should produce different checksum
1897 let mut entry3 = TxnWalEntry::data(42, b"key".to_vec(), b"different".to_vec());
1898 entry3.timestamp_us = 12345;
1899 assert_ne!(entry1.checksum(), entry3.checksum());
1900
1901 // Verify roundtrip preserves checksum
1902 let bytes = entry1.to_bytes();
1903 let mut cursor = std::io::Cursor::new(bytes);
1904 let recovered = TxnWalEntry::from_reader(&mut cursor).unwrap();
1905 assert_eq!(recovered.checksum(), entry1.checksum());
1906 }
1907}
1908
1909#[cfg(test)]
1910mod encryption_wal_tests {
1911 use super::*;
1912 use crate::encryption::EncryptionEngine;
1913 use tempfile::tempdir;
1914
1915 fn enc(key: u8) -> Arc<EncryptionEngine> {
1916 Arc::new(EncryptionEngine::new(&[key; 32]))
1917 }
1918 const UUID: [u8; 16] = [9u8; 16];
1919
1920 /// Write a committed txn (begin/data/commit) and one aborted txn via the
1921 /// encrypted paths, then reopen with the same key and crash-recover.
1922 #[test]
1923 fn encrypted_write_then_recover_roundtrip() {
1924 let dir = tempdir().unwrap();
1925 let path = dir.path().join("enc.wal");
1926
1927 {
1928 let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
1929 // committed txn via append + write_no_flush_refs + flush_buffer paths
1930 let t1 = wal.begin_transaction().unwrap();
1931 wal.write(t1, b"alpha".to_vec(), b"one".to_vec()).unwrap();
1932 wal.write_no_flush_refs(t1, b"beta", b"two").unwrap();
1933 // also exercise the buffer/flush_buffer batch path
1934 let mut buf = TxnWalBuffer::new(t1);
1935 buf.append(b"gamma", b"three");
1936 wal.flush_buffer(&buf).unwrap();
1937 wal.commit_transaction(t1).unwrap();
1938
1939 // aborted txn must NOT survive
1940 let t2 = wal.begin_transaction().unwrap();
1941 wal.write(t2, b"ghost".to_vec(), b"x".to_vec()).unwrap();
1942 wal.abort_transaction(t2).unwrap();
1943 wal.sync().unwrap();
1944 }
1945
1946 // On-disk bytes must NOT contain the plaintext values.
1947 let raw = std::fs::read(&path).unwrap();
1948 assert!(!contains(&raw, b"alpha"));
1949 assert!(!contains(&raw, b"three"));
1950
1951 let wal = TxnWal::new_with_encryption(&path, enc(7), UUID, 0).unwrap();
1952 let (writes, stats) = wal.crash_recovery().unwrap();
1953 let keys: Vec<_> = writes.iter().map(|(k, _)| k.clone()).collect();
1954 assert!(keys.contains(&b"alpha".to_vec()));
1955 assert!(keys.contains(&b"beta".to_vec()));
1956 assert!(keys.contains(&b"gamma".to_vec()));
1957 assert!(!keys.contains(&b"ghost".to_vec()), "aborted txn leaked");
1958 assert_eq!(stats.committed_txns, 1);
1959 }
1960
1961 /// Wrong key at the WAL layer must FAIL LOUD, not silently return empty.
1962 /// (In production the keyring canary catches this even earlier; this proves
1963 /// the replay path itself never swallows an AEAD failure as EOF.)
1964 #[test]
1965 fn wrong_key_fails_loud_not_empty() {
1966 let dir = tempdir().unwrap();
1967 let path = dir.path().join("enc.wal");
1968 {
1969 let wal = TxnWal::new_with_encryption(&path, enc(1), UUID, 0).unwrap();
1970 let t = wal.begin_transaction().unwrap();
1971 wal.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
1972 wal.commit_transaction(t).unwrap();
1973 wal.sync().unwrap();
1974 }
1975 // Reopen with the WRONG key: recover_state runs in the constructor and
1976 // must surface a hard error rather than an "empty" success.
1977 let opened = TxnWal::new_with_encryption(&path, enc(2), UUID, 0);
1978 assert!(opened.is_err(), "wrong key opened silently (data-loss bug)");
1979 match opened.err().unwrap() {
1980 SochDBError::Encryption(_) => {}
1981 other => panic!("expected Encryption error, got {other:?}"),
1982 }
1983 }
1984
1985 /// Tampering with a committed encrypted record must fail recovery loud.
1986 #[test]
1987 fn tamper_midstream_fails_loud() {
1988 let dir = tempdir().unwrap();
1989 let path = dir.path().join("enc.wal");
1990 {
1991 let wal = TxnWal::new_with_encryption(&path, enc(5), UUID, 0).unwrap();
1992 let t = wal.begin_transaction().unwrap();
1993 wal.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
1994 wal.commit_transaction(t).unwrap();
1995 wal.sync().unwrap();
1996 }
1997 // Flip a byte well inside the file (not the final torn-tail region).
1998 let mut raw = std::fs::read(&path).unwrap();
1999 let mid = raw.len() / 2;
2000 raw[mid] ^= 0xFF;
2001 std::fs::write(&path, &raw).unwrap();
2002
2003 let opened = TxnWal::new_with_encryption(&path, enc(5), UUID, 0);
2004 // Either the constructor's recover_state errors, or a later crash_recovery
2005 // does — but it must NEVER silently accept tampered ciphertext.
2006 let failed = opened.is_err()
2007 || opened
2008 .ok()
2009 .map(|w| w.crash_recovery().is_err())
2010 .unwrap_or(true);
2011 assert!(failed, "tampered encrypted WAL was silently accepted");
2012 }
2013
2014 /// The disabled engine must write a record's bytes EXACTLY as the legacy
2015 /// `to_bytes()` frame (no added encryption framing / format drift), while the
2016 /// enabled engine must NOT (it adds the AEAD envelope and is unreadable by the
2017 /// plaintext reader). Records embed wall-clock timestamps, so we compare a
2018 /// single fixed entry object against its own `to_bytes()` for determinism.
2019 #[test]
2020 fn disabled_engine_is_byte_identical_to_plaintext() {
2021 let dir = tempdir().unwrap();
2022
2023 let entry = TxnWalEntry::data(42, b"k1".to_vec(), b"v1".to_vec());
2024 let golden = entry.to_bytes();
2025
2026 // Disabled engine: file bytes == to_bytes() exactly.
2027 let p_dis = dir.path().join("disabled.wal");
2028 {
2029 let wal = TxnWal::new_with_encryption(
2030 &p_dis,
2031 Arc::new(EncryptionEngine::disabled()),
2032 [0u8; 16],
2033 0,
2034 )
2035 .unwrap();
2036 wal.append(&entry).unwrap();
2037 wal.sync().unwrap();
2038 }
2039 assert_eq!(
2040 std::fs::read(&p_dis).unwrap(),
2041 golden,
2042 "disabled-engine append diverged from legacy plaintext frame"
2043 );
2044
2045 // Enabled engine: file bytes differ and are NOT plaintext-parseable.
2046 let p_enc = dir.path().join("enc.wal");
2047 {
2048 let wal = TxnWal::new_with_encryption(&p_enc, enc(3), UUID, 0).unwrap();
2049 wal.append(&entry).unwrap();
2050 wal.sync().unwrap();
2051 }
2052 let enc_bytes = std::fs::read(&p_enc).unwrap();
2053 assert_ne!(enc_bytes, golden);
2054 let mut cur = std::io::Cursor::new(&enc_bytes);
2055 assert!(
2056 TxnWalEntry::from_reader(&mut cur).is_err()
2057 || cur.position() as usize != enc_bytes.len(),
2058 "ciphertext frame must not parse cleanly as a plaintext record"
2059 );
2060 }
2061
2062 fn contains(haystack: &[u8], needle: &[u8]) -> bool {
2063 haystack.windows(needle.len()).any(|w| w == needle)
2064 }
2065}