Skip to main content

sochdb_core/
txn.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction Manager for ACID Transactions
19//!
20//! Provides ACID guarantees using WAL-based transaction management:
21//! - Atomicity: All writes in a transaction succeed or fail together
22//! - Consistency: Transactions move database from valid state to valid state
23//! - Isolation: MVCC snapshot isolation for concurrent transactions
24//! - Durability: Committed transactions survive crashes via WAL
25
26use serde::{Deserialize, Serialize};
27use std::collections::HashSet;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30/// Transaction ID - monotonically increasing
31pub type TxnId = u64;
32
33/// Transaction states
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TxnState {
36    Active,
37    Committed,
38    Aborted,
39}
40
41/// WAL record types for ACID transactions (ARIES-style)
42#[repr(u8)]
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum WalRecordType {
45    /// Data write within transaction
46    Data = 0x01,
47    /// Transaction begin marker
48    TxnBegin = 0x10,
49    /// Transaction commit marker
50    TxnCommit = 0x11,
51    /// Transaction abort marker
52    TxnAbort = 0x12,
53    /// Checkpoint for recovery optimization
54    Checkpoint = 0x20,
55    /// Schema change (DDL)
56    SchemaChange = 0x30,
57    /// Compensation Log Record (CLR) for ARIES undo operations
58    CompensationLogRecord = 0x40,
59    /// End of checkpoint (contains active transactions and dirty pages)
60    CheckpointEnd = 0x21,
61    /// Page update with before/after images
62    PageUpdate = 0x02,
63}
64
65impl TryFrom<u8> for WalRecordType {
66    type Error = ();
67
68    fn try_from(value: u8) -> Result<Self, Self::Error> {
69        match value {
70            0x01 => Ok(WalRecordType::Data),
71            0x10 => Ok(WalRecordType::TxnBegin),
72            0x11 => Ok(WalRecordType::TxnCommit),
73            0x12 => Ok(WalRecordType::TxnAbort),
74            0x20 => Ok(WalRecordType::Checkpoint),
75            0x21 => Ok(WalRecordType::CheckpointEnd),
76            0x30 => Ok(WalRecordType::SchemaChange),
77            0x40 => Ok(WalRecordType::CompensationLogRecord),
78            0x02 => Ok(WalRecordType::PageUpdate),
79            _ => Err(()),
80        }
81    }
82}
83
84/// Log Sequence Number (LSN) for ARIES recovery
85///
86/// LSN ordering guarantee: If LSN(A) < LSN(B), then A happened before B in the WAL.
87/// This is critical for:
88/// - Redo: Only redo operations where page_lsn < record_lsn
89/// - Undo: Process undo in reverse LSN order
90pub type Lsn = u64;
91
92/// Page ID for tracking dirty pages
93pub type PageId = u64;
94
95/// ARIES transaction table entry for recovery
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct AriesTransactionEntry {
98    /// Transaction ID
99    pub txn_id: TxnId,
100    /// Transaction state during recovery
101    pub state: TxnState,
102    /// LSN of last log record for this transaction
103    pub last_lsn: Lsn,
104    /// LSN to undo next (for rollback)
105    pub undo_next_lsn: Option<Lsn>,
106}
107
108/// ARIES dirty page table entry for recovery
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct AriesDirtyPageEntry {
111    /// Page ID
112    pub page_id: PageId,
113    /// Recovery LSN - first LSN that might have dirtied this page
114    pub rec_lsn: Lsn,
115}
116
117/// Checkpoint data for ARIES recovery
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
119pub struct AriesCheckpointData {
120    /// Active transactions at checkpoint time
121    pub active_transactions: Vec<AriesTransactionEntry>,
122    /// Dirty pages at checkpoint time
123    pub dirty_pages: Vec<AriesDirtyPageEntry>,
124    /// LSN where checkpoint started
125    pub begin_checkpoint_lsn: Lsn,
126}
127
128/// A write operation buffered in a transaction
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct TxnWrite {
131    /// Key being written
132    pub key: Vec<u8>,
133    /// Value being written (None for delete)
134    pub value: Option<Vec<u8>>,
135    /// Table/collection this write belongs to
136    pub table: String,
137}
138
139/// A read operation recorded for conflict detection
140#[derive(Debug, Clone, Hash, PartialEq, Eq)]
141pub struct TxnRead {
142    pub key: Vec<u8>,
143    pub table: String,
144}
145
146/// WAL entry with ARIES transaction support
147///
148/// Extends standard WAL entries with ARIES-specific fields:
149/// - LSN: Log Sequence Number for ordering and idempotent recovery
150/// - prev_lsn: Previous LSN for this transaction (undo chain)
151/// - undo_info: Before-image for undo operations
152/// - page_id: Page affected by this operation
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct TxnWalEntry {
155    /// Type of this WAL record
156    pub record_type: WalRecordType,
157    /// Transaction ID
158    pub txn_id: TxnId,
159    /// Timestamp in microseconds
160    pub timestamp_us: u64,
161    /// Optional key for data records
162    pub key: Option<Vec<u8>>,
163    /// Optional value for data records (after-image)
164    pub value: Option<Vec<u8>>,
165    /// Optional table name
166    pub table: Option<String>,
167    /// CRC32 checksum
168    pub checksum: u32,
169    /// ARIES: Log Sequence Number (assigned when appended to WAL)
170    #[serde(default)]
171    pub lsn: Lsn,
172    /// ARIES: Previous LSN in this transaction's chain (for undo)
173    #[serde(default)]
174    pub prev_lsn: Option<Lsn>,
175    /// ARIES: Page ID affected by this record
176    #[serde(default)]
177    pub page_id: Option<PageId>,
178    /// ARIES: Before-image for undo (original value before update)
179    #[serde(default)]
180    pub undo_info: Option<Vec<u8>>,
181    /// ARIES: For CLRs, the next LSN to undo (skips compensated operations)
182    #[serde(default)]
183    pub undo_next_lsn: Option<Lsn>,
184}
185
186impl TxnWalEntry {
187    pub fn new_begin(txn_id: TxnId, timestamp_us: u64) -> Self {
188        Self {
189            record_type: WalRecordType::TxnBegin,
190            txn_id,
191            timestamp_us,
192            key: None,
193            value: None,
194            table: None,
195            checksum: 0,
196            lsn: 0,
197            prev_lsn: None,
198            page_id: None,
199            undo_info: None,
200            undo_next_lsn: None,
201        }
202    }
203
204    pub fn new_commit(txn_id: TxnId, timestamp_us: u64) -> Self {
205        Self {
206            record_type: WalRecordType::TxnCommit,
207            txn_id,
208            timestamp_us,
209            key: None,
210            value: None,
211            table: None,
212            checksum: 0,
213            lsn: 0,
214            prev_lsn: None,
215            page_id: None,
216            undo_info: None,
217            undo_next_lsn: None,
218        }
219    }
220
221    pub fn new_abort(txn_id: TxnId, timestamp_us: u64) -> Self {
222        Self {
223            record_type: WalRecordType::TxnAbort,
224            txn_id,
225            timestamp_us,
226            key: None,
227            value: None,
228            table: None,
229            checksum: 0,
230            lsn: 0,
231            prev_lsn: None,
232            page_id: None,
233            undo_info: None,
234            undo_next_lsn: None,
235        }
236    }
237
238    pub fn new_data(
239        txn_id: TxnId,
240        timestamp_us: u64,
241        table: String,
242        key: Vec<u8>,
243        value: Option<Vec<u8>>,
244    ) -> Self {
245        Self {
246            record_type: WalRecordType::Data,
247            txn_id,
248            timestamp_us,
249            key: Some(key),
250            value,
251            table: Some(table),
252            checksum: 0,
253            lsn: 0,
254            prev_lsn: None,
255            page_id: None,
256            undo_info: None,
257            undo_next_lsn: None,
258        }
259    }
260
261    /// Create a new ARIES-style data record with before-image for undo
262    #[allow(clippy::too_many_arguments)]
263    pub fn new_aries_data(
264        txn_id: TxnId,
265        timestamp_us: u64,
266        table: String,
267        key: Vec<u8>,
268        value: Option<Vec<u8>>,
269        page_id: PageId,
270        prev_lsn: Option<Lsn>,
271        undo_info: Option<Vec<u8>>,
272    ) -> Self {
273        Self {
274            record_type: WalRecordType::Data,
275            txn_id,
276            timestamp_us,
277            key: Some(key),
278            value,
279            table: Some(table),
280            checksum: 0,
281            lsn: 0, // Assigned when appended to WAL
282            prev_lsn,
283            page_id: Some(page_id),
284            undo_info,
285            undo_next_lsn: None,
286        }
287    }
288
289    /// Create a Compensation Log Record (CLR) for ARIES undo
290    ///
291    /// CLRs are redo-only records that describe undo operations.
292    /// They include undo_next_lsn which points to the next record to undo,
293    /// skipping the compensated operation.
294    #[allow(clippy::too_many_arguments)]
295    pub fn new_clr(
296        txn_id: TxnId,
297        timestamp_us: u64,
298        table: String,
299        key: Vec<u8>,
300        value: Option<Vec<u8>>,
301        page_id: PageId,
302        prev_lsn: Lsn,
303        undo_next_lsn: Lsn,
304    ) -> Self {
305        Self {
306            record_type: WalRecordType::CompensationLogRecord,
307            txn_id,
308            timestamp_us,
309            key: Some(key),
310            value,
311            table: Some(table),
312            checksum: 0,
313            lsn: 0,
314            prev_lsn: Some(prev_lsn),
315            page_id: Some(page_id),
316            undo_info: None, // CLRs don't need undo info (redo-only)
317            undo_next_lsn: Some(undo_next_lsn),
318        }
319    }
320
321    /// Create a checkpoint end record with recovery data
322    pub fn new_checkpoint_end(
323        timestamp_us: u64,
324        checkpoint_data: AriesCheckpointData,
325    ) -> Result<Self, String> {
326        let data = bincode::serialize(&checkpoint_data)
327            .map_err(|e| format!("Failed to serialize checkpoint data: {}", e))?;
328        Ok(Self {
329            record_type: WalRecordType::CheckpointEnd,
330            txn_id: 0,
331            timestamp_us,
332            key: None,
333            value: Some(data),
334            table: None,
335            checksum: 0,
336            lsn: 0,
337            prev_lsn: None,
338            page_id: None,
339            undo_info: None,
340            undo_next_lsn: None,
341        })
342    }
343
344    /// Extract checkpoint data from a CheckpointEnd record
345    pub fn get_checkpoint_data(&self) -> Option<AriesCheckpointData> {
346        if self.record_type != WalRecordType::CheckpointEnd {
347            return None;
348        }
349        self.value
350            .as_ref()
351            .and_then(|data| bincode::deserialize(data).ok())
352    }
353
354    /// Calculate and set checksum
355    pub fn compute_checksum(&mut self) {
356        let data = self.serialize_for_checksum();
357        self.checksum = crc32fast::hash(&data);
358    }
359
360    /// Verify checksum
361    pub fn verify_checksum(&self) -> bool {
362        let data = self.serialize_for_checksum();
363        crc32fast::hash(&data) == self.checksum
364    }
365
366    fn serialize_for_checksum(&self) -> Vec<u8> {
367        // Serialize without checksum field
368        let mut buf = Vec::new();
369        buf.push(self.record_type as u8);
370        buf.extend(&self.txn_id.to_le_bytes());
371        buf.extend(&self.timestamp_us.to_le_bytes());
372        if let Some(ref key) = self.key {
373            buf.extend(&(key.len() as u32).to_le_bytes());
374            buf.extend(key);
375        } else {
376            buf.extend(&0u32.to_le_bytes());
377        }
378        if let Some(ref value) = self.value {
379            buf.extend(&(value.len() as u32).to_le_bytes());
380            buf.extend(value);
381        } else {
382            buf.extend(&0u32.to_le_bytes());
383        }
384        if let Some(ref table) = self.table {
385            buf.extend(&(table.len() as u32).to_le_bytes());
386            buf.extend(table.as_bytes());
387        } else {
388            buf.extend(&0u32.to_le_bytes());
389        }
390        buf
391    }
392
393    /// Serialize to bytes
394    pub fn to_bytes(&self) -> Vec<u8> {
395        let mut buf = self.serialize_for_checksum();
396        buf.extend(&self.checksum.to_le_bytes());
397        buf
398    }
399
400    /// Deserialize from bytes with proper error propagation
401    ///
402    /// Returns an error if:
403    /// - Data is too short (minimum 21 bytes)
404    /// - Record type is invalid
405    /// - Data is truncated mid-field
406    /// - UTF-8 encoding is invalid for table name
407    /// - Checksum validation fails
408    pub fn from_bytes(data: &[u8]) -> Result<Self, String> {
409        // Fixed header: 1 (type) + 8 (txn_id) + 8 (timestamp) + 4 (checksum minimum) = 21
410        if data.len() < 21 {
411            return Err(format!(
412                "WAL entry too short: {} bytes, need at least 21",
413                data.len()
414            ));
415        }
416
417        let record_type = WalRecordType::try_from(data[0])
418            .map_err(|_| format!("Invalid WAL record type: {}", data[0]))?;
419
420        let txn_id = u64::from_le_bytes(
421            data[1..9]
422                .try_into()
423                .map_err(|_| "Failed to parse txn_id: slice too short")?,
424        );
425        let timestamp_us = u64::from_le_bytes(
426            data[9..17]
427                .try_into()
428                .map_err(|_| "Failed to parse timestamp: slice too short")?,
429        );
430
431        let mut offset = 17;
432
433        // Parse key with bounds checking
434        if offset + 4 > data.len() {
435            return Err(format!(
436                "WAL entry truncated at key_len: offset {} + 4 > {}",
437                offset,
438                data.len()
439            ));
440        }
441        let key_len = u32::from_le_bytes(
442            data[offset..offset + 4]
443                .try_into()
444                .map_err(|_| "Failed to parse key_len")?,
445        ) as usize;
446        offset += 4;
447
448        if offset + key_len > data.len() {
449            return Err(format!(
450                "WAL entry truncated at key: need {} bytes at offset {}, have {}",
451                key_len,
452                offset,
453                data.len()
454            ));
455        }
456        let key = if key_len > 0 {
457            Some(data[offset..offset + key_len].to_vec())
458        } else {
459            None
460        };
461        offset += key_len;
462
463        // Parse value with bounds checking
464        if offset + 4 > data.len() {
465            return Err(format!(
466                "WAL entry truncated at value_len: offset {} + 4 > {}",
467                offset,
468                data.len()
469            ));
470        }
471        let value_len = u32::from_le_bytes(
472            data[offset..offset + 4]
473                .try_into()
474                .map_err(|_| "Failed to parse value_len")?,
475        ) as usize;
476        offset += 4;
477
478        if offset + value_len > data.len() {
479            return Err(format!(
480                "WAL entry truncated at value: need {} bytes at offset {}, have {}",
481                value_len,
482                offset,
483                data.len()
484            ));
485        }
486        let value = if value_len > 0 {
487            Some(data[offset..offset + value_len].to_vec())
488        } else {
489            None
490        };
491        offset += value_len;
492
493        // Parse table name with bounds checking
494        if offset + 4 > data.len() {
495            return Err(format!(
496                "WAL entry truncated at table_len: offset {} + 4 > {}",
497                offset,
498                data.len()
499            ));
500        }
501        let table_len = u32::from_le_bytes(
502            data[offset..offset + 4]
503                .try_into()
504                .map_err(|_| "Failed to parse table_len")?,
505        ) as usize;
506        offset += 4;
507
508        if offset + table_len > data.len() {
509            return Err(format!(
510                "WAL entry truncated at table: need {} bytes at offset {}, have {}",
511                table_len,
512                offset,
513                data.len()
514            ));
515        }
516        let table = if table_len > 0 {
517            Some(
518                String::from_utf8(data[offset..offset + table_len].to_vec())
519                    .map_err(|e| format!("Invalid UTF-8 in table name: {}", e))?,
520            )
521        } else {
522            None
523        };
524        offset += table_len;
525
526        // Parse checksum with bounds checking
527        if offset + 4 > data.len() {
528            return Err(format!(
529                "WAL entry truncated at checksum: offset {} + 4 > {}",
530                offset,
531                data.len()
532            ));
533        }
534        let checksum = u32::from_le_bytes(
535            data[offset..offset + 4]
536                .try_into()
537                .map_err(|_| "Failed to parse checksum")?,
538        );
539
540        let entry = Self {
541            record_type,
542            txn_id,
543            timestamp_us,
544            key,
545            value,
546            table,
547            checksum,
548            // ARIES fields default to zero/None for backward compatibility
549            lsn: 0,
550            prev_lsn: None,
551            page_id: None,
552            undo_info: None,
553            undo_next_lsn: None,
554        };
555
556        // Verify checksum to detect corruption
557        if !entry.verify_checksum() {
558            return Err(format!(
559                "WAL entry checksum mismatch for txn_id {}: expected valid checksum, got {}",
560                entry.txn_id, entry.checksum
561            ));
562        }
563
564        Ok(entry)
565    }
566}
567
568/// Transaction handle for the user
569#[derive(Debug)]
570pub struct Transaction {
571    /// Unique transaction ID
572    pub id: TxnId,
573    /// Transaction state
574    pub state: TxnState,
575    /// Start timestamp for MVCC
576    pub start_ts: u64,
577    /// Commit timestamp (set on commit)
578    pub commit_ts: Option<u64>,
579    /// Buffered writes
580    pub writes: Vec<TxnWrite>,
581    /// Read set for conflict detection
582    pub read_set: HashSet<TxnRead>,
583    /// Isolation level
584    pub isolation: IsolationLevel,
585}
586
587/// Transaction isolation levels
588#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
589pub enum IsolationLevel {
590    /// Read committed - see committed changes
591    ReadCommitted,
592    /// Snapshot isolation - consistent point-in-time view
593    #[default]
594    SnapshotIsolation,
595    /// Serializable - strongest isolation
596    Serializable,
597}
598
599impl Transaction {
600    /// Create a new transaction
601    pub fn new(id: TxnId, start_ts: u64, isolation: IsolationLevel) -> Self {
602        Self {
603            id,
604            state: TxnState::Active,
605            start_ts,
606            commit_ts: None,
607            writes: Vec::new(),
608            read_set: HashSet::new(),
609            isolation,
610        }
611    }
612
613    /// Buffer a write operation
614    pub fn put(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
615        self.writes.push(TxnWrite {
616            key,
617            value: Some(value),
618            table: table.to_string(),
619        });
620    }
621
622    /// Buffer a delete operation
623    pub fn delete(&mut self, table: &str, key: Vec<u8>) {
624        self.writes.push(TxnWrite {
625            key,
626            value: None,
627            table: table.to_string(),
628        });
629    }
630
631    /// Record a read for conflict detection
632    pub fn record_read(&mut self, table: &str, key: Vec<u8>) {
633        self.read_set.insert(TxnRead {
634            key,
635            table: table.to_string(),
636        });
637    }
638
639    /// Check for read-your-writes
640    pub fn get_local(&self, table: &str, key: &[u8]) -> Option<&TxnWrite> {
641        self.writes
642            .iter()
643            .rev()
644            .find(|w| w.table == table && w.key == key)
645    }
646
647    /// Check if transaction has any writes
648    pub fn is_read_only(&self) -> bool {
649        self.writes.is_empty()
650    }
651}
652
653/// Transaction Manager stats
654#[derive(Debug, Clone, Default)]
655pub struct TxnStats {
656    pub active_count: u64,
657    pub committed_count: u64,
658    pub aborted_count: u64,
659    pub conflict_aborts: u64,
660}
661
662/// Transaction Manager (in-memory, no WAL durability)
663///
664/// Manages transaction lifecycle and provides ACID guarantees for in-memory
665/// operations. This implementation does NOT include WAL integration.
666/// 
667/// For production workloads requiring durability, use [`sochdb_storage::MvccTransactionManager`]
668/// which includes:
669/// - Write-ahead logging for crash recovery  
670/// - Serializable Snapshot Isolation (SSI)
671/// - Group commit for high throughput
672/// - Event-driven async architecture
673pub struct TransactionManager {
674    /// Next transaction ID
675    next_txn_id: AtomicU64,
676    /// Current timestamp counter
677    timestamp_counter: AtomicU64,
678    /// Committed transaction watermark
679    committed_watermark: AtomicU64,
680    /// Statistics
681    stats: parking_lot::RwLock<TxnStats>,
682}
683
684impl TransactionManager {
685    pub fn new() -> Self {
686        Self {
687            next_txn_id: AtomicU64::new(1),
688            timestamp_counter: AtomicU64::new(1),
689            committed_watermark: AtomicU64::new(0),
690            stats: parking_lot::RwLock::new(TxnStats::default()),
691        }
692    }
693
694    /// Begin a new transaction
695    pub fn begin(&self) -> Transaction {
696        self.begin_with_isolation(IsolationLevel::default())
697    }
698
699    /// Begin a transaction with specific isolation level
700    pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Transaction {
701        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
702        let start_ts = self.timestamp_counter.fetch_add(1, Ordering::SeqCst);
703
704        {
705            let mut stats = self.stats.write();
706            stats.active_count += 1;
707        }
708
709        Transaction::new(txn_id, start_ts, isolation)
710    }
711
712    /// Get commit timestamp
713    pub fn get_commit_ts(&self) -> u64 {
714        self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
715    }
716
717    /// Mark transaction as committed
718    pub fn mark_committed(&self, txn: &mut Transaction) {
719        txn.state = TxnState::Committed;
720        txn.commit_ts = Some(self.get_commit_ts());
721
722        let mut stats = self.stats.write();
723        stats.active_count = stats.active_count.saturating_sub(1);
724        stats.committed_count += 1;
725    }
726
727    /// Mark transaction as aborted
728    pub fn mark_aborted(&self, txn: &mut Transaction) {
729        txn.state = TxnState::Aborted;
730
731        let mut stats = self.stats.write();
732        stats.active_count = stats.active_count.saturating_sub(1);
733        stats.aborted_count += 1;
734    }
735
736    /// Mark transaction as aborted due to conflict
737    pub fn mark_conflict_abort(&self, txn: &mut Transaction) {
738        self.mark_aborted(txn);
739
740        let mut stats = self.stats.write();
741        stats.conflict_aborts += 1;
742    }
743
744    /// Get the oldest active transaction timestamp
745    pub fn oldest_active_ts(&self) -> u64 {
746        self.committed_watermark.load(Ordering::SeqCst)
747    }
748
749    /// Update the committed watermark
750    pub fn advance_watermark(&self, new_watermark: u64) {
751        self.committed_watermark
752            .fetch_max(new_watermark, Ordering::SeqCst);
753    }
754
755    /// Get current stats
756    pub fn stats(&self) -> TxnStats {
757        self.stats.read().clone()
758    }
759}
760
761impl Default for TransactionManager {
762    fn default() -> Self {
763        Self::new()
764    }
765}
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770
771    #[test]
772    fn test_transaction_lifecycle() {
773        let mgr = TransactionManager::new();
774
775        let mut txn = mgr.begin();
776        assert_eq!(txn.state, TxnState::Active);
777        assert!(txn.is_read_only());
778
779        txn.put("users", vec![1], vec![2, 3, 4]);
780        assert!(!txn.is_read_only());
781
782        mgr.mark_committed(&mut txn);
783        assert_eq!(txn.state, TxnState::Committed);
784        assert!(txn.commit_ts.is_some());
785    }
786
787    #[test]
788    fn test_read_your_writes() {
789        let mgr = TransactionManager::new();
790        let mut txn = mgr.begin();
791
792        txn.put("users", vec![1], vec![10, 20]);
793        txn.put("users", vec![1], vec![30, 40]); // Overwrite
794
795        let local = txn.get_local("users", &[1]);
796        assert!(local.is_some());
797        assert_eq!(local.unwrap().value, Some(vec![30, 40]));
798    }
799
800    #[test]
801    fn test_wal_entry_serialization() {
802        let mut entry = TxnWalEntry::new_data(
803            42,
804            1234567890,
805            "users".to_string(),
806            vec![1, 2, 3],
807            Some(vec![4, 5, 6]),
808        );
809        entry.compute_checksum();
810
811        let bytes = entry.to_bytes();
812        let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
813
814        assert_eq!(parsed.txn_id, 42);
815        assert_eq!(parsed.timestamp_us, 1234567890);
816        assert_eq!(parsed.table, Some("users".to_string()));
817        assert_eq!(parsed.key, Some(vec![1, 2, 3]));
818        assert_eq!(parsed.value, Some(vec![4, 5, 6]));
819        assert!(parsed.verify_checksum());
820    }
821
822    #[test]
823    fn test_transaction_stats() {
824        let mgr = TransactionManager::new();
825
826        let mut txn1 = mgr.begin();
827        let mut txn2 = mgr.begin();
828
829        assert_eq!(mgr.stats().active_count, 2);
830
831        mgr.mark_committed(&mut txn1);
832        assert_eq!(mgr.stats().committed_count, 1);
833
834        mgr.mark_aborted(&mut txn2);
835        assert_eq!(mgr.stats().aborted_count, 1);
836        assert_eq!(mgr.stats().active_count, 0);
837    }
838
839    #[test]
840    fn test_wal_entry_error_too_short() {
841        // Less than minimum 21 bytes
842        let short_data = vec![0u8; 10];
843        let result = TxnWalEntry::from_bytes(&short_data);
844        assert!(result.is_err());
845        assert!(result.unwrap_err().contains("too short"));
846    }
847
848    #[test]
849    fn test_wal_entry_error_invalid_record_type() {
850        // Create data with invalid record type (255)
851        let mut data = vec![0u8; 30];
852        data[0] = 255; // Invalid record type
853        let result = TxnWalEntry::from_bytes(&data);
854        assert!(result.is_err());
855        assert!(result.unwrap_err().contains("Invalid WAL record type"));
856    }
857
858    #[test]
859    fn test_wal_entry_error_truncated_key() {
860        // Create entry claiming 1000 byte key but data too short
861        let mut entry =
862            TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1, 2], Some(vec![3, 4]));
863        entry.compute_checksum();
864        let mut bytes = entry.to_bytes();
865
866        // Corrupt key_len to claim huge key
867        let huge_len: u32 = 10000;
868        bytes[17..21].copy_from_slice(&huge_len.to_le_bytes());
869
870        let result = TxnWalEntry::from_bytes(&bytes);
871        assert!(result.is_err());
872        assert!(result.unwrap_err().contains("truncated at key"));
873    }
874
875    #[test]
876    fn test_wal_entry_error_corrupted_checksum() {
877        let mut entry = TxnWalEntry::new_data(
878            42,
879            1234567890,
880            "users".to_string(),
881            vec![1, 2, 3],
882            Some(vec![4, 5, 6]),
883        );
884        entry.compute_checksum();
885
886        let mut bytes = entry.to_bytes();
887        // Corrupt the checksum (last 4 bytes)
888        let len = bytes.len();
889        bytes[len - 1] ^= 0xFF; // Flip bits
890
891        let result = TxnWalEntry::from_bytes(&bytes);
892        assert!(result.is_err());
893        assert!(result.unwrap_err().contains("checksum mismatch"));
894    }
895
896    #[test]
897    fn test_wal_entry_error_invalid_utf8_table() {
898        let mut entry = TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1], Some(vec![2]));
899        entry.compute_checksum();
900        let mut bytes = entry.to_bytes();
901
902        // Find table offset and corrupt UTF-8
903        // Header: 1 + 8 + 8 = 17, key_len: 4, key: 1, value_len: 4, value: 1, table_len: 4, table: 4
904        let table_start = 17 + 4 + 1 + 4 + 1 + 4;
905        bytes[table_start] = 0xFF; // Invalid UTF-8 byte
906
907        let result = TxnWalEntry::from_bytes(&bytes);
908        // Either checksum or UTF-8 error
909        assert!(result.is_err());
910    }
911}