Skip to main content

sochdb_core/
txn.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Transaction Manager for ACID Transactions
19//!
20//! Provides ACID guarantees using WAL-based transaction management:
21//! - Atomicity: All writes in a transaction succeed or fail together
22//! - Consistency: Transactions move database from valid state to valid state
23//! - Isolation: MVCC snapshot isolation for concurrent transactions
24//! - Durability: Committed transactions survive crashes via WAL
25
26use serde::{Deserialize, Serialize};
27use std::collections::HashSet;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30/// Transaction ID - monotonically increasing
31pub type TxnId = u64;
32
33/// Transaction states
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum TxnState {
36    Active,
37    Committed,
38    Aborted,
39}
40
41/// WAL record types for ACID transactions (ARIES-style)
42///
43/// This is the **canonical** definition. All crates SHOULD use this type
44/// via `use sochdb_core::txn::WalRecordType` rather than defining their own.
45///
46/// # Discriminant Scheme
47///
48/// Values use hex categories for logical grouping:
49/// - `0x01-0x0F`: Data operations
50/// - `0x10-0x1F`: Transaction lifecycle
51/// - `0x20-0x2F`: Checkpoint/recovery
52/// - `0x30-0x3F`: Schema operations  
53/// - `0x40-0x4F`: ARIES-specific records
54/// - `0x50-0x5F`: LSM/compaction operations
55/// - `0x60-0x6F`: Atomic batch operations
56#[repr(u8)]
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
58pub enum WalRecordType {
59    /// Data write within transaction
60    Data = 0x01,
61    /// Page update with before/after images
62    PageUpdate = 0x02,
63    /// Delete operation
64    Delete = 0x03,
65    /// Transaction begin marker
66    TxnBegin = 0x10,
67    /// Transaction commit marker
68    TxnCommit = 0x11,
69    /// Transaction abort marker
70    TxnAbort = 0x12,
71    /// Savepoint within a transaction
72    Savepoint = 0x13,
73    /// Rollback to savepoint
74    RollbackToSavepoint = 0x14,
75    /// Transaction end (resource cleanup after commit/abort)
76    TxnEnd = 0x15,
77    /// Checkpoint for recovery optimization
78    Checkpoint = 0x20,
79    /// End of checkpoint (contains active transactions and dirty pages)
80    CheckpointEnd = 0x21,
81    /// Schema change (DDL)
82    SchemaChange = 0x30,
83    /// Compensation Log Record (CLR) for ARIES undo operations
84    CompensationLogRecord = 0x40,
85    /// LSM compaction record
86    Compaction = 0x50,
87    /// LSM flush (memtable to SSTable)
88    Flush = 0x51,
89    /// Atomic batch begin
90    BatchBegin = 0x60,
91    /// Atomic batch commit  
92    BatchCommit = 0x61,
93}
94
95impl TryFrom<u8> for WalRecordType {
96    type Error = ();
97
98    fn try_from(value: u8) -> Result<Self, Self::Error> {
99        match value {
100            0x01 => Ok(WalRecordType::Data),
101            0x02 => Ok(WalRecordType::PageUpdate),
102            0x03 => Ok(WalRecordType::Delete),
103            0x10 => Ok(WalRecordType::TxnBegin),
104            0x11 => Ok(WalRecordType::TxnCommit),
105            0x12 => Ok(WalRecordType::TxnAbort),
106            0x13 => Ok(WalRecordType::Savepoint),
107            0x14 => Ok(WalRecordType::RollbackToSavepoint),
108            0x15 => Ok(WalRecordType::TxnEnd),
109            0x20 => Ok(WalRecordType::Checkpoint),
110            0x21 => Ok(WalRecordType::CheckpointEnd),
111            0x30 => Ok(WalRecordType::SchemaChange),
112            0x40 => Ok(WalRecordType::CompensationLogRecord),
113            0x50 => Ok(WalRecordType::Compaction),
114            0x51 => Ok(WalRecordType::Flush),
115            0x60 => Ok(WalRecordType::BatchBegin),
116            0x61 => Ok(WalRecordType::BatchCommit),
117            _ => Err(()),
118        }
119    }
120}
121
122/// Log Sequence Number (LSN) for ARIES recovery
123///
124/// LSN ordering guarantee: If LSN(A) < LSN(B), then A happened before B in the WAL.
125/// This is critical for:
126/// - Redo: Only redo operations where page_lsn < record_lsn
127/// - Undo: Process undo in reverse LSN order
128pub type Lsn = u64;
129
130/// Page ID for tracking dirty pages
131pub type PageId = u64;
132
133/// ARIES transaction table entry for recovery
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct AriesTransactionEntry {
136    /// Transaction ID
137    pub txn_id: TxnId,
138    /// Transaction state during recovery
139    pub state: TxnState,
140    /// LSN of last log record for this transaction
141    pub last_lsn: Lsn,
142    /// LSN to undo next (for rollback)
143    pub undo_next_lsn: Option<Lsn>,
144}
145
146/// ARIES dirty page table entry for recovery
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct AriesDirtyPageEntry {
149    /// Page ID
150    pub page_id: PageId,
151    /// Recovery LSN - first LSN that might have dirtied this page
152    pub rec_lsn: Lsn,
153}
154
155/// Checkpoint data for ARIES recovery
156#[derive(Debug, Clone, Default, Serialize, Deserialize)]
157pub struct AriesCheckpointData {
158    /// Active transactions at checkpoint time
159    pub active_transactions: Vec<AriesTransactionEntry>,
160    /// Dirty pages at checkpoint time
161    pub dirty_pages: Vec<AriesDirtyPageEntry>,
162    /// LSN where checkpoint started
163    pub begin_checkpoint_lsn: Lsn,
164}
165
166/// A write operation buffered in a transaction
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub struct TxnWrite {
169    /// Key being written
170    pub key: Vec<u8>,
171    /// Value being written (None for delete)
172    pub value: Option<Vec<u8>>,
173    /// Table/collection this write belongs to
174    pub table: String,
175}
176
177/// A read operation recorded for conflict detection
178#[derive(Debug, Clone, Hash, PartialEq, Eq)]
179pub struct TxnRead {
180    pub key: Vec<u8>,
181    pub table: String,
182}
183
184/// WAL entry with ARIES transaction support
185///
186/// Extends standard WAL entries with ARIES-specific fields:
187/// - LSN: Log Sequence Number for ordering and idempotent recovery
188/// - prev_lsn: Previous LSN for this transaction (undo chain)
189/// - undo_info: Before-image for undo operations
190/// - page_id: Page affected by this operation
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct TxnWalEntry {
193    /// Type of this WAL record
194    pub record_type: WalRecordType,
195    /// Transaction ID
196    pub txn_id: TxnId,
197    /// Timestamp in microseconds
198    pub timestamp_us: u64,
199    /// Optional key for data records
200    pub key: Option<Vec<u8>>,
201    /// Optional value for data records (after-image)
202    pub value: Option<Vec<u8>>,
203    /// Optional table name
204    pub table: Option<String>,
205    /// CRC32 checksum
206    pub checksum: u32,
207    /// ARIES: Log Sequence Number (assigned when appended to WAL)
208    #[serde(default)]
209    pub lsn: Lsn,
210    /// ARIES: Previous LSN in this transaction's chain (for undo)
211    #[serde(default)]
212    pub prev_lsn: Option<Lsn>,
213    /// ARIES: Page ID affected by this record
214    #[serde(default)]
215    pub page_id: Option<PageId>,
216    /// ARIES: Before-image for undo (original value before update)
217    #[serde(default)]
218    pub undo_info: Option<Vec<u8>>,
219    /// ARIES: For CLRs, the next LSN to undo (skips compensated operations)
220    #[serde(default)]
221    pub undo_next_lsn: Option<Lsn>,
222}
223
224impl TxnWalEntry {
225    pub fn new_begin(txn_id: TxnId, timestamp_us: u64) -> Self {
226        Self {
227            record_type: WalRecordType::TxnBegin,
228            txn_id,
229            timestamp_us,
230            key: None,
231            value: None,
232            table: None,
233            checksum: 0,
234            lsn: 0,
235            prev_lsn: None,
236            page_id: None,
237            undo_info: None,
238            undo_next_lsn: None,
239        }
240    }
241
242    pub fn new_commit(txn_id: TxnId, timestamp_us: u64) -> Self {
243        Self {
244            record_type: WalRecordType::TxnCommit,
245            txn_id,
246            timestamp_us,
247            key: None,
248            value: None,
249            table: None,
250            checksum: 0,
251            lsn: 0,
252            prev_lsn: None,
253            page_id: None,
254            undo_info: None,
255            undo_next_lsn: None,
256        }
257    }
258
259    pub fn new_abort(txn_id: TxnId, timestamp_us: u64) -> Self {
260        Self {
261            record_type: WalRecordType::TxnAbort,
262            txn_id,
263            timestamp_us,
264            key: None,
265            value: None,
266            table: None,
267            checksum: 0,
268            lsn: 0,
269            prev_lsn: None,
270            page_id: None,
271            undo_info: None,
272            undo_next_lsn: None,
273        }
274    }
275
276    pub fn new_data(
277        txn_id: TxnId,
278        timestamp_us: u64,
279        table: String,
280        key: Vec<u8>,
281        value: Option<Vec<u8>>,
282    ) -> Self {
283        Self {
284            record_type: WalRecordType::Data,
285            txn_id,
286            timestamp_us,
287            key: Some(key),
288            value,
289            table: Some(table),
290            checksum: 0,
291            lsn: 0,
292            prev_lsn: None,
293            page_id: None,
294            undo_info: None,
295            undo_next_lsn: None,
296        }
297    }
298
299    /// Create a new ARIES-style data record with before-image for undo
300    #[allow(clippy::too_many_arguments)]
301    pub fn new_aries_data(
302        txn_id: TxnId,
303        timestamp_us: u64,
304        table: String,
305        key: Vec<u8>,
306        value: Option<Vec<u8>>,
307        page_id: PageId,
308        prev_lsn: Option<Lsn>,
309        undo_info: Option<Vec<u8>>,
310    ) -> Self {
311        Self {
312            record_type: WalRecordType::Data,
313            txn_id,
314            timestamp_us,
315            key: Some(key),
316            value,
317            table: Some(table),
318            checksum: 0,
319            lsn: 0, // Assigned when appended to WAL
320            prev_lsn,
321            page_id: Some(page_id),
322            undo_info,
323            undo_next_lsn: None,
324        }
325    }
326
327    /// Create a Compensation Log Record (CLR) for ARIES undo
328    ///
329    /// CLRs are redo-only records that describe undo operations.
330    /// They include undo_next_lsn which points to the next record to undo,
331    /// skipping the compensated operation.
332    #[allow(clippy::too_many_arguments)]
333    pub fn new_clr(
334        txn_id: TxnId,
335        timestamp_us: u64,
336        table: String,
337        key: Vec<u8>,
338        value: Option<Vec<u8>>,
339        page_id: PageId,
340        prev_lsn: Lsn,
341        undo_next_lsn: Lsn,
342    ) -> Self {
343        Self {
344            record_type: WalRecordType::CompensationLogRecord,
345            txn_id,
346            timestamp_us,
347            key: Some(key),
348            value,
349            table: Some(table),
350            checksum: 0,
351            lsn: 0,
352            prev_lsn: Some(prev_lsn),
353            page_id: Some(page_id),
354            undo_info: None, // CLRs don't need undo info (redo-only)
355            undo_next_lsn: Some(undo_next_lsn),
356        }
357    }
358
359    /// Create a checkpoint end record with recovery data
360    pub fn new_checkpoint_end(
361        timestamp_us: u64,
362        checkpoint_data: AriesCheckpointData,
363    ) -> Result<Self, String> {
364        let data = bincode::serialize(&checkpoint_data)
365            .map_err(|e| format!("Failed to serialize checkpoint data: {}", e))?;
366        Ok(Self {
367            record_type: WalRecordType::CheckpointEnd,
368            txn_id: 0,
369            timestamp_us,
370            key: None,
371            value: Some(data),
372            table: None,
373            checksum: 0,
374            lsn: 0,
375            prev_lsn: None,
376            page_id: None,
377            undo_info: None,
378            undo_next_lsn: None,
379        })
380    }
381
382    /// Extract checkpoint data from a CheckpointEnd record
383    pub fn get_checkpoint_data(&self) -> Option<AriesCheckpointData> {
384        if self.record_type != WalRecordType::CheckpointEnd {
385            return None;
386        }
387        self.value
388            .as_ref()
389            .and_then(|data| bincode::deserialize(data).ok())
390    }
391
392    /// Calculate and set checksum
393    pub fn compute_checksum(&mut self) {
394        let data = self.serialize_for_checksum();
395        self.checksum = crc32fast::hash(&data);
396    }
397
398    /// Verify checksum
399    pub fn verify_checksum(&self) -> bool {
400        let data = self.serialize_for_checksum();
401        crc32fast::hash(&data) == self.checksum
402    }
403
404    fn serialize_for_checksum(&self) -> Vec<u8> {
405        // Serialize all fields except checksum for CRC32 computation.
406        // IMPORTANT: This MUST include ALL ARIES fields to prevent silent
407        // data corruption during crash recovery. If a field is not checksummed,
408        // it can be corrupted without detection, breaking ARIES undo/redo chains.
409        let mut buf = Vec::new();
410        buf.push(self.record_type as u8);
411        buf.extend(&self.txn_id.to_le_bytes());
412        buf.extend(&self.timestamp_us.to_le_bytes());
413        if let Some(ref key) = self.key {
414            buf.extend(&(key.len() as u32).to_le_bytes());
415            buf.extend(key);
416        } else {
417            buf.extend(&0u32.to_le_bytes());
418        }
419        if let Some(ref value) = self.value {
420            buf.extend(&(value.len() as u32).to_le_bytes());
421            buf.extend(value);
422        } else {
423            buf.extend(&0u32.to_le_bytes());
424        }
425        if let Some(ref table) = self.table {
426            buf.extend(&(table.len() as u32).to_le_bytes());
427            buf.extend(table.as_bytes());
428        } else {
429            buf.extend(&0u32.to_le_bytes());
430        }
431        // ARIES fields — critical for recovery correctness
432        buf.extend(&self.lsn.to_le_bytes());
433        // prev_lsn: 0 = None, 1 + value = Some(value)
434        match self.prev_lsn {
435            Some(lsn) => {
436                buf.push(1u8);
437                buf.extend(&lsn.to_le_bytes());
438            }
439            None => buf.push(0u8),
440        }
441        // page_id: same encoding
442        match self.page_id {
443            Some(pid) => {
444                buf.push(1u8);
445                buf.extend(&pid.to_le_bytes());
446            }
447            None => buf.push(0u8),
448        }
449        // undo_info: length-prefixed optional blob
450        if let Some(ref undo) = self.undo_info {
451            buf.extend(&(undo.len() as u32).to_le_bytes());
452            buf.extend(undo);
453        } else {
454            buf.extend(&0u32.to_le_bytes());
455        }
456        // undo_next_lsn: same encoding as prev_lsn
457        match self.undo_next_lsn {
458            Some(lsn) => {
459                buf.push(1u8);
460                buf.extend(&lsn.to_le_bytes());
461            }
462            None => buf.push(0u8),
463        }
464        buf
465    }
466
467    /// Wire format version for forward/backward compatibility.
468    ///
469    /// - V0 (0x00): Legacy 6-field format (record_type, txn_id, timestamp_us, key, value, table, checksum).
470    ///              ARIES fields are NOT serialized; from_bytes() defaults them to zero/None.
471    /// - V1 (0x01): Full ARIES format. All fields serialized including lsn, prev_lsn,
472    ///              page_id, undo_info, undo_next_lsn. Checksum covers ALL fields.
473    pub const FORMAT_VERSION: u8 = 0x01;
474
475    /// Serialize to bytes (versioned wire format)
476    ///
477    /// Format V1 layout:
478    /// ```text
479    /// [version:1] [serialize_for_checksum payload] [crc32:4]
480    /// ```
481    pub fn to_bytes(&self) -> Vec<u8> {
482        let payload = self.serialize_for_checksum();
483        let mut buf = Vec::with_capacity(1 + payload.len() + 4);
484        buf.push(Self::FORMAT_VERSION);
485        buf.extend(&payload);
486        buf.extend(&self.checksum.to_le_bytes());
487        buf
488    }
489
490    /// Deserialize from bytes with proper error propagation
491    ///
492    /// Supports both wire format versions:
493    /// - V0 (legacy): No version byte; first byte is a valid WalRecordType discriminant.
494    ///   ARIES fields default to zero/None.
495    /// - V1: First byte is 0x01 (FORMAT_VERSION). All ARIES fields deserialized.
496    ///
497    /// Returns an error if:
498    /// - Data is too short
499    /// - Record type is invalid
500    /// - Data is truncated mid-field
501    /// - UTF-8 encoding is invalid for table name
502    /// - Checksum validation fails
503    pub fn from_bytes(data: &[u8]) -> Result<Self, String> {
504        if data.is_empty() {
505            return Err("WAL entry empty".to_string());
506        }
507
508        // Detect format version: V1 starts with 0x01 which is also WalRecordType::Data,
509        // so we use a heuristic: V1 has version byte 0x01 followed by another record_type byte.
510        // V0 has no version byte, first byte IS the record type.
511        // Disambiguation: if first byte is 0x01 (FORMAT_VERSION/Data) AND data.len() >= 2
512        // AND second byte is a valid WalRecordType, then it's V1.
513        // Otherwise, treat as V0 for backward compatibility.
514        let (version, payload_start) = if data.len() >= 2
515            && data[0] == Self::FORMAT_VERSION
516            && WalRecordType::try_from(data[1]).is_ok()
517        {
518            (1u8, 1usize) // V1: skip version byte
519        } else {
520            (0u8, 0usize) // V0: no version byte
521        };
522
523        let payload = &data[payload_start..];
524
525        // Fixed header: 1 (type) + 8 (txn_id) + 8 (timestamp) = 17 minimum before variable fields
526        if payload.len() < 21 {
527            return Err(format!(
528                "WAL entry too short: {} bytes, need at least 21",
529                payload.len()
530            ));
531        }
532
533        let record_type = WalRecordType::try_from(payload[0])
534            .map_err(|_| format!("Invalid WAL record type: {}", payload[0]))?;
535
536        let txn_id = u64::from_le_bytes(
537            payload[1..9]
538                .try_into()
539                .map_err(|_| "Failed to parse txn_id: slice too short")?,
540        );
541        let timestamp_us = u64::from_le_bytes(
542            payload[9..17]
543                .try_into()
544                .map_err(|_| "Failed to parse timestamp: slice too short")?,
545        );
546
547        let mut offset = 17;
548
549        // Parse key with bounds checking
550        if offset + 4 > payload.len() {
551            return Err(format!(
552                "WAL entry truncated at key_len: offset {} + 4 > {}",
553                offset,
554                payload.len()
555            ));
556        }
557        let key_len = u32::from_le_bytes(
558            payload[offset..offset + 4]
559                .try_into()
560                .map_err(|_| "Failed to parse key_len")?,
561        ) as usize;
562        offset += 4;
563
564        if offset + key_len > payload.len() {
565            return Err(format!(
566                "WAL entry truncated at key: need {} bytes at offset {}, have {}",
567                key_len,
568                offset,
569                payload.len()
570            ));
571        }
572        let key = if key_len > 0 {
573            Some(payload[offset..offset + key_len].to_vec())
574        } else {
575            None
576        };
577        offset += key_len;
578
579        // Parse value with bounds checking
580        if offset + 4 > payload.len() {
581            return Err(format!(
582                "WAL entry truncated at value_len: offset {} + 4 > {}",
583                offset,
584                payload.len()
585            ));
586        }
587        let value_len = u32::from_le_bytes(
588            payload[offset..offset + 4]
589                .try_into()
590                .map_err(|_| "Failed to parse value_len")?,
591        ) as usize;
592        offset += 4;
593
594        if offset + value_len > payload.len() {
595            return Err(format!(
596                "WAL entry truncated at value: need {} bytes at offset {}, have {}",
597                value_len,
598                offset,
599                payload.len()
600            ));
601        }
602        let value = if value_len > 0 {
603            Some(payload[offset..offset + value_len].to_vec())
604        } else {
605            None
606        };
607        offset += value_len;
608
609        // Parse table name with bounds checking
610        if offset + 4 > payload.len() {
611            return Err(format!(
612                "WAL entry truncated at table_len: offset {} + 4 > {}",
613                offset,
614                payload.len()
615            ));
616        }
617        let table_len = u32::from_le_bytes(
618            payload[offset..offset + 4]
619                .try_into()
620                .map_err(|_| "Failed to parse table_len")?,
621        ) as usize;
622        offset += 4;
623
624        if offset + table_len > payload.len() {
625            return Err(format!(
626                "WAL entry truncated at table: need {} bytes at offset {}, have {}",
627                table_len,
628                offset,
629                payload.len()
630            ));
631        }
632        let table = if table_len > 0 {
633            Some(
634                String::from_utf8(payload[offset..offset + table_len].to_vec())
635                    .map_err(|e| format!("Invalid UTF-8 in table name: {}", e))?,
636            )
637        } else {
638            None
639        };
640        offset += table_len;
641
642        // Parse ARIES fields for V1, default for V0
643        let (lsn, prev_lsn, page_id, undo_info, undo_next_lsn) = if version >= 1 {
644            // V1: ARIES fields follow table
645            if offset + 8 > payload.len() {
646                return Err(format!(
647                    "WAL V1 entry truncated at lsn: offset {} + 8 > {}",
648                    offset,
649                    payload.len()
650                ));
651            }
652            let lsn = u64::from_le_bytes(
653                payload[offset..offset + 8]
654                    .try_into()
655                    .map_err(|_| "Failed to parse lsn")?,
656            );
657            offset += 8;
658
659            // prev_lsn: tag byte + optional u64
660            if offset >= payload.len() {
661                return Err("WAL V1 entry truncated at prev_lsn tag".to_string());
662            }
663            let prev_lsn = if payload[offset] == 1 {
664                offset += 1;
665                if offset + 8 > payload.len() {
666                    return Err("WAL V1 entry truncated at prev_lsn value".to_string());
667                }
668                let v = u64::from_le_bytes(
669                    payload[offset..offset + 8]
670                        .try_into()
671                        .map_err(|_| "Failed to parse prev_lsn")?,
672                );
673                offset += 8;
674                Some(v)
675            } else {
676                offset += 1;
677                None
678            };
679
680            // page_id: tag byte + optional u64
681            if offset >= payload.len() {
682                return Err("WAL V1 entry truncated at page_id tag".to_string());
683            }
684            let page_id = if payload[offset] == 1 {
685                offset += 1;
686                if offset + 8 > payload.len() {
687                    return Err("WAL V1 entry truncated at page_id value".to_string());
688                }
689                let v = u64::from_le_bytes(
690                    payload[offset..offset + 8]
691                        .try_into()
692                        .map_err(|_| "Failed to parse page_id")?,
693                );
694                offset += 8;
695                Some(v)
696            } else {
697                offset += 1;
698                None
699            };
700
701            // undo_info: length-prefixed optional blob
702            if offset + 4 > payload.len() {
703                return Err("WAL V1 entry truncated at undo_info_len".to_string());
704            }
705            let undo_len = u32::from_le_bytes(
706                payload[offset..offset + 4]
707                    .try_into()
708                    .map_err(|_| "Failed to parse undo_info_len")?,
709            ) as usize;
710            offset += 4;
711            let undo_info = if undo_len > 0 {
712                if offset + undo_len > payload.len() {
713                    return Err("WAL V1 entry truncated at undo_info data".to_string());
714                }
715                let v = payload[offset..offset + undo_len].to_vec();
716                offset += undo_len;
717                Some(v)
718            } else {
719                None
720            };
721
722            // undo_next_lsn: tag byte + optional u64
723            if offset >= payload.len() {
724                return Err("WAL V1 entry truncated at undo_next_lsn tag".to_string());
725            }
726            let undo_next_lsn = if payload[offset] == 1 {
727                offset += 1;
728                if offset + 8 > payload.len() {
729                    return Err("WAL V1 entry truncated at undo_next_lsn value".to_string());
730                }
731                let v = u64::from_le_bytes(
732                    payload[offset..offset + 8]
733                        .try_into()
734                        .map_err(|_| "Failed to parse undo_next_lsn")?,
735                );
736                offset += 8;
737                Some(v)
738            } else {
739                offset += 1;
740                None
741            };
742
743            (lsn, prev_lsn, page_id, undo_info, undo_next_lsn)
744        } else {
745            // V0: ARIES fields default to zero/None for backward compatibility
746            (0u64, None, None, None, None)
747        };
748
749        // Parse checksum with bounds checking
750        if offset + 4 > payload.len() {
751            return Err(format!(
752                "WAL entry truncated at checksum: offset {} + 4 > {}",
753                offset,
754                payload.len()
755            ));
756        }
757        let checksum = u32::from_le_bytes(
758            payload[offset..offset + 4]
759                .try_into()
760                .map_err(|_| "Failed to parse checksum")?,
761        );
762
763        let entry = Self {
764            record_type,
765            txn_id,
766            timestamp_us,
767            key,
768            value,
769            table,
770            checksum,
771            lsn,
772            prev_lsn,
773            page_id,
774            undo_info,
775            undo_next_lsn,
776        };
777
778        // Verify checksum to detect corruption
779        if !entry.verify_checksum() {
780            return Err(format!(
781                "WAL entry checksum mismatch for txn_id {}: expected valid checksum, got {}",
782                entry.txn_id, entry.checksum
783            ));
784        }
785
786        Ok(entry)
787    }
788}
789
790/// Transaction handle for the user
791#[derive(Debug)]
792pub struct Transaction {
793    /// Unique transaction ID
794    pub id: TxnId,
795    /// Transaction state
796    pub state: TxnState,
797    /// Start timestamp for MVCC
798    pub start_ts: u64,
799    /// Commit timestamp (set on commit)
800    pub commit_ts: Option<u64>,
801    /// Buffered writes
802    pub writes: Vec<TxnWrite>,
803    /// Read set for conflict detection
804    pub read_set: HashSet<TxnRead>,
805    /// Isolation level
806    pub isolation: IsolationLevel,
807}
808
809/// Transaction isolation levels
810#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
811pub enum IsolationLevel {
812    /// Read committed - see committed changes
813    ReadCommitted,
814    /// Snapshot isolation - consistent point-in-time view
815    #[default]
816    SnapshotIsolation,
817    /// Serializable - strongest isolation
818    Serializable,
819}
820
821impl Transaction {
822    /// Create a new transaction
823    pub fn new(id: TxnId, start_ts: u64, isolation: IsolationLevel) -> Self {
824        Self {
825            id,
826            state: TxnState::Active,
827            start_ts,
828            commit_ts: None,
829            writes: Vec::new(),
830            read_set: HashSet::new(),
831            isolation,
832        }
833    }
834
835    /// Buffer a write operation
836    pub fn put(&mut self, table: &str, key: Vec<u8>, value: Vec<u8>) {
837        self.writes.push(TxnWrite {
838            key,
839            value: Some(value),
840            table: table.to_string(),
841        });
842    }
843
844    /// Buffer a delete operation
845    pub fn delete(&mut self, table: &str, key: Vec<u8>) {
846        self.writes.push(TxnWrite {
847            key,
848            value: None,
849            table: table.to_string(),
850        });
851    }
852
853    /// Record a read for conflict detection
854    pub fn record_read(&mut self, table: &str, key: Vec<u8>) {
855        self.read_set.insert(TxnRead {
856            key,
857            table: table.to_string(),
858        });
859    }
860
861    /// Check for read-your-writes
862    pub fn get_local(&self, table: &str, key: &[u8]) -> Option<&TxnWrite> {
863        self.writes
864            .iter()
865            .rev()
866            .find(|w| w.table == table && w.key == key)
867    }
868
869    /// Check if transaction has any writes
870    pub fn is_read_only(&self) -> bool {
871        self.writes.is_empty()
872    }
873}
874
875/// Transaction Manager stats
876#[derive(Debug, Clone, Default)]
877pub struct TxnStats {
878    pub active_count: u64,
879    pub committed_count: u64,
880    pub aborted_count: u64,
881    pub conflict_aborts: u64,
882}
883
884/// Transaction Manager (in-memory, no WAL durability)
885///
886/// Manages transaction lifecycle and provides ACID guarantees for in-memory
887/// operations. This implementation does NOT include WAL integration.
888/// 
889/// For production workloads requiring durability, use [`sochdb_storage::MvccTransactionManager`]
890/// which includes:
891/// - Write-ahead logging for crash recovery  
892/// - Serializable Snapshot Isolation (SSI)
893/// - Group commit for high throughput
894/// - Event-driven async architecture
895pub struct TransactionManager {
896    /// Next transaction ID
897    next_txn_id: AtomicU64,
898    /// Current timestamp counter
899    timestamp_counter: AtomicU64,
900    /// Committed transaction watermark
901    committed_watermark: AtomicU64,
902    /// Statistics
903    stats: parking_lot::RwLock<TxnStats>,
904}
905
906impl TransactionManager {
907    pub fn new() -> Self {
908        Self {
909            next_txn_id: AtomicU64::new(1),
910            timestamp_counter: AtomicU64::new(1),
911            committed_watermark: AtomicU64::new(0),
912            stats: parking_lot::RwLock::new(TxnStats::default()),
913        }
914    }
915
916    /// Begin a new transaction
917    pub fn begin(&self) -> Transaction {
918        self.begin_with_isolation(IsolationLevel::default())
919    }
920
921    /// Begin a transaction with specific isolation level
922    pub fn begin_with_isolation(&self, isolation: IsolationLevel) -> Transaction {
923        let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
924        let start_ts = self.timestamp_counter.fetch_add(1, Ordering::SeqCst);
925
926        {
927            let mut stats = self.stats.write();
928            stats.active_count += 1;
929        }
930
931        Transaction::new(txn_id, start_ts, isolation)
932    }
933
934    /// Get commit timestamp
935    pub fn get_commit_ts(&self) -> u64 {
936        self.timestamp_counter.fetch_add(1, Ordering::SeqCst)
937    }
938
939    /// Mark transaction as committed
940    pub fn mark_committed(&self, txn: &mut Transaction) {
941        txn.state = TxnState::Committed;
942        txn.commit_ts = Some(self.get_commit_ts());
943
944        let mut stats = self.stats.write();
945        stats.active_count = stats.active_count.saturating_sub(1);
946        stats.committed_count += 1;
947    }
948
949    /// Mark transaction as aborted
950    pub fn mark_aborted(&self, txn: &mut Transaction) {
951        txn.state = TxnState::Aborted;
952
953        let mut stats = self.stats.write();
954        stats.active_count = stats.active_count.saturating_sub(1);
955        stats.aborted_count += 1;
956    }
957
958    /// Mark transaction as aborted due to conflict
959    pub fn mark_conflict_abort(&self, txn: &mut Transaction) {
960        self.mark_aborted(txn);
961
962        let mut stats = self.stats.write();
963        stats.conflict_aborts += 1;
964    }
965
966    /// Get the oldest active transaction timestamp
967    pub fn oldest_active_ts(&self) -> u64 {
968        self.committed_watermark.load(Ordering::SeqCst)
969    }
970
971    /// Update the committed watermark
972    pub fn advance_watermark(&self, new_watermark: u64) {
973        self.committed_watermark
974            .fetch_max(new_watermark, Ordering::SeqCst);
975    }
976
977    /// Get current stats
978    pub fn stats(&self) -> TxnStats {
979        self.stats.read().clone()
980    }
981}
982
983impl Default for TransactionManager {
984    fn default() -> Self {
985        Self::new()
986    }
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992
993    #[test]
994    fn test_transaction_lifecycle() {
995        let mgr = TransactionManager::new();
996
997        let mut txn = mgr.begin();
998        assert_eq!(txn.state, TxnState::Active);
999        assert!(txn.is_read_only());
1000
1001        txn.put("users", vec![1], vec![2, 3, 4]);
1002        assert!(!txn.is_read_only());
1003
1004        mgr.mark_committed(&mut txn);
1005        assert_eq!(txn.state, TxnState::Committed);
1006        assert!(txn.commit_ts.is_some());
1007    }
1008
1009    #[test]
1010    fn test_read_your_writes() {
1011        let mgr = TransactionManager::new();
1012        let mut txn = mgr.begin();
1013
1014        txn.put("users", vec![1], vec![10, 20]);
1015        txn.put("users", vec![1], vec![30, 40]); // Overwrite
1016
1017        let local = txn.get_local("users", &[1]);
1018        assert!(local.is_some());
1019        assert_eq!(local.unwrap().value, Some(vec![30, 40]));
1020    }
1021
1022    #[test]
1023    fn test_wal_entry_serialization() {
1024        let mut entry = TxnWalEntry::new_data(
1025            42,
1026            1234567890,
1027            "users".to_string(),
1028            vec![1, 2, 3],
1029            Some(vec![4, 5, 6]),
1030        );
1031        entry.compute_checksum();
1032
1033        let bytes = entry.to_bytes();
1034        let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1035
1036        assert_eq!(parsed.txn_id, 42);
1037        assert_eq!(parsed.timestamp_us, 1234567890);
1038        assert_eq!(parsed.table, Some("users".to_string()));
1039        assert_eq!(parsed.key, Some(vec![1, 2, 3]));
1040        assert_eq!(parsed.value, Some(vec![4, 5, 6]));
1041        assert!(parsed.verify_checksum());
1042    }
1043
1044    #[test]
1045    fn test_wal_entry_aries_roundtrip() {
1046        // Verify ARIES fields survive serialization round-trip (T1-T2 fix)
1047        let mut entry = TxnWalEntry::new_aries_data(
1048            99,
1049            9999999,
1050            "orders".to_string(),
1051            vec![10, 20],
1052            Some(vec![30, 40, 50]),
1053            42,           // page_id
1054            Some(100),    // prev_lsn
1055            Some(vec![0xDE, 0xAD]), // undo_info
1056        );
1057        entry.lsn = 200;
1058        entry.undo_next_lsn = Some(50);
1059        entry.compute_checksum();
1060
1061        let bytes = entry.to_bytes();
1062        let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1063
1064        assert_eq!(parsed.txn_id, 99);
1065        assert_eq!(parsed.lsn, 200);
1066        assert_eq!(parsed.prev_lsn, Some(100));
1067        assert_eq!(parsed.page_id, Some(42));
1068        assert_eq!(parsed.undo_info, Some(vec![0xDE, 0xAD]));
1069        assert_eq!(parsed.undo_next_lsn, Some(50));
1070        assert_eq!(parsed.key, Some(vec![10, 20]));
1071        assert_eq!(parsed.value, Some(vec![30, 40, 50]));
1072        assert!(parsed.verify_checksum());
1073    }
1074
1075    #[test]
1076    fn test_wal_entry_clr_roundtrip() {
1077        let mut entry = TxnWalEntry::new_clr(
1078            77,
1079            5555555,
1080            "inventory".to_string(),
1081            vec![1],
1082            Some(vec![2]),
1083            10,     // page_id
1084            300,    // prev_lsn
1085            250,    // undo_next_lsn
1086        );
1087        entry.lsn = 400;
1088        entry.compute_checksum();
1089
1090        let bytes = entry.to_bytes();
1091        let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1092
1093        assert_eq!(parsed.record_type, WalRecordType::CompensationLogRecord);
1094        assert_eq!(parsed.lsn, 400);
1095        assert_eq!(parsed.prev_lsn, Some(300));
1096        assert_eq!(parsed.page_id, Some(10));
1097        assert_eq!(parsed.undo_next_lsn, Some(250));
1098        assert!(parsed.undo_info.is_none()); // CLRs have no undo_info
1099        assert!(parsed.verify_checksum());
1100    }
1101
1102    #[test]
1103    fn test_wal_entry_none_aries_fields_roundtrip() {
1104        // Entry with all ARIES fields as None/0
1105        let mut entry = TxnWalEntry::new_begin(1, 100);
1106        entry.compute_checksum();
1107
1108        let bytes = entry.to_bytes();
1109        let parsed = TxnWalEntry::from_bytes(&bytes).unwrap();
1110
1111        assert_eq!(parsed.lsn, 0);
1112        assert_eq!(parsed.prev_lsn, None);
1113        assert_eq!(parsed.page_id, None);
1114        assert_eq!(parsed.undo_info, None);
1115        assert_eq!(parsed.undo_next_lsn, None);
1116        assert!(parsed.verify_checksum());
1117    }
1118
1119    #[test]
1120    fn test_transaction_stats() {
1121        let mgr = TransactionManager::new();
1122
1123        let mut txn1 = mgr.begin();
1124        let mut txn2 = mgr.begin();
1125
1126        assert_eq!(mgr.stats().active_count, 2);
1127
1128        mgr.mark_committed(&mut txn1);
1129        assert_eq!(mgr.stats().committed_count, 1);
1130
1131        mgr.mark_aborted(&mut txn2);
1132        assert_eq!(mgr.stats().aborted_count, 1);
1133        assert_eq!(mgr.stats().active_count, 0);
1134    }
1135
1136    #[test]
1137    fn test_wal_entry_error_too_short() {
1138        // Less than minimum required bytes
1139        let short_data = vec![0u8; 10];
1140        let result = TxnWalEntry::from_bytes(&short_data);
1141        assert!(result.is_err());
1142        assert!(result.unwrap_err().contains("too short"));
1143    }
1144
1145    #[test]
1146    fn test_wal_entry_error_invalid_record_type() {
1147        // Create data with invalid record type (255)
1148        let mut data = vec![0u8; 30];
1149        data[0] = 255; // Invalid record type
1150        let result = TxnWalEntry::from_bytes(&data);
1151        assert!(result.is_err());
1152        assert!(result.unwrap_err().contains("Invalid WAL record type"));
1153    }
1154
1155    #[test]
1156    fn test_wal_entry_error_truncated_key() {
1157        // Create entry claiming 1000 byte key but data too short
1158        let mut entry =
1159            TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1, 2], Some(vec![3, 4]));
1160        entry.compute_checksum();
1161        let mut bytes = entry.to_bytes();
1162
1163        // Corrupt key_len to claim huge key
1164        let huge_len: u32 = 10000;
1165        bytes[17..21].copy_from_slice(&huge_len.to_le_bytes());
1166
1167        let result = TxnWalEntry::from_bytes(&bytes);
1168        assert!(result.is_err());
1169        assert!(result.unwrap_err().contains("truncated at key"));
1170    }
1171
1172    #[test]
1173    fn test_wal_entry_error_corrupted_checksum() {
1174        let mut entry = TxnWalEntry::new_data(
1175            42,
1176            1234567890,
1177            "users".to_string(),
1178            vec![1, 2, 3],
1179            Some(vec![4, 5, 6]),
1180        );
1181        entry.compute_checksum();
1182
1183        let mut bytes = entry.to_bytes();
1184        // Corrupt the checksum (last 4 bytes)
1185        let len = bytes.len();
1186        bytes[len - 1] ^= 0xFF; // Flip bits
1187
1188        let result = TxnWalEntry::from_bytes(&bytes);
1189        assert!(result.is_err());
1190        assert!(result.unwrap_err().contains("checksum mismatch"));
1191    }
1192
1193    #[test]
1194    fn test_wal_entry_error_invalid_utf8_table() {
1195        let mut entry = TxnWalEntry::new_data(1, 100, "test".to_string(), vec![1], Some(vec![2]));
1196        entry.compute_checksum();
1197        let mut bytes = entry.to_bytes();
1198
1199        // Find table offset and corrupt UTF-8
1200        // Header: 1 + 8 + 8 = 17, key_len: 4, key: 1, value_len: 4, value: 1, table_len: 4, table: 4
1201        let table_start = 17 + 4 + 1 + 4 + 1 + 4;
1202        bytes[table_start] = 0xFF; // Invalid UTF-8 byte
1203
1204        let result = TxnWalEntry::from_bytes(&bytes);
1205        // Either checksum or UTF-8 error
1206        assert!(result.is_err());
1207    }
1208}