sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
//! Tests for IN_PROGRESS transaction recovery
//!
//! This test module verifies that IN_PROGRESS transactions are correctly
//! handled during WAL recovery:
//! 1. IN_PROGRESS transactions are detected and logged
//! 2. IN_PROGRESS transactions are NOT replayed (committed=false)
//! 3. Database state is preserved as if the transaction never happened

use sqlitegraph::backend::native::v2::wal::V2WALRecord;
use sqlitegraph::backend::native::v2::wal::recovery::core::TransactionState;
use tempfile::tempdir;

/// Test that uncommitted transactions are filtered out during replay
///
/// This test verifies the core logic that prevents IN_PROGRESS transactions
/// from being replayed during recovery.
#[test]
fn test_uncommitted_transactions_filtered() {
    // Create test transactions with different states
    let transactions = vec![
        // Committed transaction - should be replayed
        TransactionState {
            tx_id: 1,
            start_lsn: 1,
            commit_lsn: Some(10),
            records: vec![V2WALRecord::NodeInsert {
                node_id: 1,
                slot_offset: 1000,
                node_data: vec![1, 2, 3],
            }],
            committed: true,
            timestamp: 0,
        },
        // IN_PROGRESS transaction - should NOT be replayed
        TransactionState {
            tx_id: 2,
            start_lsn: 11,
            commit_lsn: None, // No commit LSN = IN_PROGRESS
            records: vec![V2WALRecord::NodeInsert {
                node_id: 2,
                slot_offset: 2000,
                node_data: vec![4, 5, 6],
            }],
            committed: false, // IN_PROGRESS transactions have committed=false
            timestamp: 0,
        },
        // Rolled back transaction - should NOT be replayed
        TransactionState {
            tx_id: 3,
            start_lsn: 21,
            commit_lsn: Some(30),
            records: vec![V2WALRecord::NodeInsert {
                node_id: 3,
                slot_offset: 3000,
                node_data: vec![7, 8, 9],
            }],
            committed: false, // Explicitly rolled back
            timestamp: 0,
        },
    ];

    // Apply the same filtering logic as replay_transactions()
    let committed_transactions: Vec<_> = transactions
        .iter()
        .filter(|tx| tx.committed && tx.commit_lsn.is_some())
        .collect();

    // Verify only TX 1 (committed) is included
    assert_eq!(
        committed_transactions.len(),
        1,
        "Only committed transactions should be replayed"
    );
    assert_eq!(
        committed_transactions[0].tx_id, 1,
        "TX 1 should be included"
    );
}

/// Test that finalize_incomplete_transactions marks IN_PROGRESS correctly
///
/// This verifies the scanner's behavior when finishing a WAL scan:
/// active transactions should be marked as incomplete (committed=false).
#[test]
fn test_finalize_incomplete_transactions_behavior() {
    use parking_lot::Mutex;
    use std::collections::HashMap;
    use std::sync::Arc;

    // Simulate active_transactions state
    let active_transactions: Arc<Mutex<HashMap<u64, TransactionState>>> =
        Arc::new(Mutex::new(HashMap::new()));

    // Insert an IN_PROGRESS transaction (as would happen during WAL scanning)
    {
        let mut active = active_transactions.lock();
        active.insert(
            1,
            TransactionState {
                tx_id: 1,
                start_lsn: 1,
                commit_lsn: None, // No commit yet = IN_PROGRESS
                records: vec![],
                committed: false, // IN_PROGRESS transactions start as uncommitted
                timestamp: 0,
            },
        );
    }

    // Simulate finalize_incomplete_transactions behavior
    let mut finalized_transactions = Vec::new();
    let mut warnings = Vec::new();

    {
        let mut active = active_transactions.lock();
        for (_, tx_state) in active.drain() {
            warnings.push(format!(
                "Incomplete transaction TX {} recovered",
                tx_state.tx_id
            ));
            finalized_transactions.push(tx_state);
        }
    }

    // Verify the IN_PROGRESS transaction was finalized
    assert_eq!(
        finalized_transactions.len(),
        1,
        "IN_PROGRESS transaction should be finalized"
    );
    assert_eq!(finalized_transactions[0].tx_id, 1);
    assert_eq!(
        finalized_transactions[0].committed, false,
        "Should remain uncommitted"
    );
    assert_eq!(
        finalized_transactions[0].commit_lsn, None,
        "Should have no commit LSN"
    );
    assert_eq!(warnings.len(), 1);
    assert!(warnings[0].contains("Incomplete transaction TX 1 recovered"));
}

/// Test that TransactionState is correctly initialized for new transactions
///
/// This verifies the base state of IN_PROGRESS transactions.
#[test]
fn test_transaction_state_initialization() {
    let tx_state = TransactionState {
        tx_id: 42,
        start_lsn: 100,
        commit_lsn: None,
        records: vec![],
        committed: false, // IN_PROGRESS = not committed
        timestamp: 1234567890,
    };

    // Verify IN_PROGRESS transaction state
    assert_eq!(tx_state.tx_id, 42);
    assert_eq!(tx_state.start_lsn, 100);
    assert_eq!(tx_state.commit_lsn, None, "IN_PROGRESS has no commit LSN");
    assert_eq!(tx_state.committed, false, "IN_PROGRESS is not committed");
    assert_eq!(tx_state.records.len(), 0);

    // Verify this transaction would be filtered out during replay
    let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
    assert!(
        !should_replay,
        "IN_PROGRESS transactions should not be replayed"
    );
}

/// Test committed transaction passes the filter
#[test]
fn test_committed_transaction_passes_filter() {
    let tx_state = TransactionState {
        tx_id: 1,
        start_lsn: 1,
        commit_lsn: Some(10), // Has commit LSN
        records: vec![],
        committed: true, // Explicitly committed
        timestamp: 0,
    };

    // Verify committed transaction state
    assert_eq!(tx_state.commit_lsn, Some(10));
    assert_eq!(tx_state.committed, true);

    // Verify this transaction would be included during replay
    let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
    assert!(should_replay, "Committed transactions should be replayed");
}

/// Test multiple IN_PROGRESS transactions are all filtered
#[test]
fn test_multiple_in_progress_transactions_filtered() {
    let transactions = vec![
        TransactionState {
            tx_id: 1,
            start_lsn: 1,
            commit_lsn: Some(10),
            records: vec![],
            committed: true,
            timestamp: 0,
        },
        // Multiple IN_PROGRESS transactions
        TransactionState {
            tx_id: 2,
            start_lsn: 11,
            commit_lsn: None,
            records: vec![],
            committed: false,
            timestamp: 0,
        },
        TransactionState {
            tx_id: 3,
            start_lsn: 21,
            commit_lsn: None,
            records: vec![],
            committed: false,
            timestamp: 0,
        },
        TransactionState {
            tx_id: 4,
            start_lsn: 31,
            commit_lsn: Some(40),
            records: vec![],
            committed: true,
            timestamp: 0,
        },
    ];

    let committed_transactions: Vec<_> = transactions
        .iter()
        .filter(|tx| tx.committed && tx.commit_lsn.is_some())
        .collect();

    assert_eq!(
        committed_transactions.len(),
        2,
        "Only TX 1 and TX 4 should pass filter"
    );
    assert_eq!(committed_transactions[0].tx_id, 1);
    assert_eq!(committed_transactions[1].tx_id, 4);
}

/// Test edge case: committed without commit_lsn (should be filtered)
#[test]
fn test_committed_without_commit_lsn_filtered() {
    // Edge case: Transaction marked committed but no LSN
    // This should be filtered out as it's likely incomplete
    let tx_state = TransactionState {
        tx_id: 1,
        start_lsn: 1,
        commit_lsn: None, // No commit LSN
        records: vec![],
        committed: true, // But marked as committed
        timestamp: 0,
    };

    // The filter requires BOTH conditions
    let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
    assert!(
        !should_replay,
        "Transactions without commit_lsn should not replay"
    );
}

/// Test rollback transaction state
#[test]
fn test_rollback_transaction_state() {
    let tx_state = TransactionState {
        tx_id: 1,
        start_lsn: 1,
        commit_lsn: Some(10), // Has LSN (rollback record)
        records: vec![],
        committed: false, // Rolled back
        timestamp: 0,
    };

    // Verify rolled back transaction is NOT replayed
    let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
    assert!(
        !should_replay,
        "Rolled back transactions should not be replayed"
    );
}