1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
//! Tests for IN_PROGRESS transaction recovery
//!
//! This test module verifies that IN_PROGRESS transactions are correctly
//! handled during WAL recovery:
//! 1. IN_PROGRESS transactions are detected and logged
//! 2. IN_PROGRESS transactions are NOT replayed (committed=false)
//! 3. Database state is preserved as if the transaction never happened
use sqlitegraph::backend::native::v2::wal::V2WALRecord;
use sqlitegraph::backend::native::v2::wal::recovery::core::TransactionState;
use tempfile::tempdir;
/// Test that uncommitted transactions are filtered out during replay
///
/// This test verifies the core logic that prevents IN_PROGRESS transactions
/// from being replayed during recovery.
#[test]
fn test_uncommitted_transactions_filtered() {
// Create test transactions with different states
let transactions = vec![
// Committed transaction - should be replayed
TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10),
records: vec![V2WALRecord::NodeInsert {
node_id: 1,
slot_offset: 1000,
node_data: vec![1, 2, 3],
}],
committed: true,
timestamp: 0,
},
// IN_PROGRESS transaction - should NOT be replayed
TransactionState {
tx_id: 2,
start_lsn: 11,
commit_lsn: None, // No commit LSN = IN_PROGRESS
records: vec![V2WALRecord::NodeInsert {
node_id: 2,
slot_offset: 2000,
node_data: vec![4, 5, 6],
}],
committed: false, // IN_PROGRESS transactions have committed=false
timestamp: 0,
},
// Rolled back transaction - should NOT be replayed
TransactionState {
tx_id: 3,
start_lsn: 21,
commit_lsn: Some(30),
records: vec![V2WALRecord::NodeInsert {
node_id: 3,
slot_offset: 3000,
node_data: vec![7, 8, 9],
}],
committed: false, // Explicitly rolled back
timestamp: 0,
},
];
// Apply the same filtering logic as replay_transactions()
let committed_transactions: Vec<_> = transactions
.iter()
.filter(|tx| tx.committed && tx.commit_lsn.is_some())
.collect();
// Verify only TX 1 (committed) is included
assert_eq!(
committed_transactions.len(),
1,
"Only committed transactions should be replayed"
);
assert_eq!(
committed_transactions[0].tx_id, 1,
"TX 1 should be included"
);
}
/// Test that finalize_incomplete_transactions marks IN_PROGRESS correctly
///
/// This verifies the scanner's behavior when finishing a WAL scan:
/// active transactions should be marked as incomplete (committed=false).
#[test]
fn test_finalize_incomplete_transactions_behavior() {
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
// Simulate active_transactions state
let active_transactions: Arc<Mutex<HashMap<u64, TransactionState>>> =
Arc::new(Mutex::new(HashMap::new()));
// Insert an IN_PROGRESS transaction (as would happen during WAL scanning)
{
let mut active = active_transactions.lock();
active.insert(
1,
TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: None, // No commit yet = IN_PROGRESS
records: vec![],
committed: false, // IN_PROGRESS transactions start as uncommitted
timestamp: 0,
},
);
}
// Simulate finalize_incomplete_transactions behavior
let mut finalized_transactions = Vec::new();
let mut warnings = Vec::new();
{
let mut active = active_transactions.lock();
for (_, tx_state) in active.drain() {
warnings.push(format!(
"Incomplete transaction TX {} recovered",
tx_state.tx_id
));
finalized_transactions.push(tx_state);
}
}
// Verify the IN_PROGRESS transaction was finalized
assert_eq!(
finalized_transactions.len(),
1,
"IN_PROGRESS transaction should be finalized"
);
assert_eq!(finalized_transactions[0].tx_id, 1);
assert_eq!(
finalized_transactions[0].committed, false,
"Should remain uncommitted"
);
assert_eq!(
finalized_transactions[0].commit_lsn, None,
"Should have no commit LSN"
);
assert_eq!(warnings.len(), 1);
assert!(warnings[0].contains("Incomplete transaction TX 1 recovered"));
}
/// Test that TransactionState is correctly initialized for new transactions
///
/// This verifies the base state of IN_PROGRESS transactions.
#[test]
fn test_transaction_state_initialization() {
let tx_state = TransactionState {
tx_id: 42,
start_lsn: 100,
commit_lsn: None,
records: vec![],
committed: false, // IN_PROGRESS = not committed
timestamp: 1234567890,
};
// Verify IN_PROGRESS transaction state
assert_eq!(tx_state.tx_id, 42);
assert_eq!(tx_state.start_lsn, 100);
assert_eq!(tx_state.commit_lsn, None, "IN_PROGRESS has no commit LSN");
assert_eq!(tx_state.committed, false, "IN_PROGRESS is not committed");
assert_eq!(tx_state.records.len(), 0);
// Verify this transaction would be filtered out during replay
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"IN_PROGRESS transactions should not be replayed"
);
}
/// Test committed transaction passes the filter
#[test]
fn test_committed_transaction_passes_filter() {
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10), // Has commit LSN
records: vec![],
committed: true, // Explicitly committed
timestamp: 0,
};
// Verify committed transaction state
assert_eq!(tx_state.commit_lsn, Some(10));
assert_eq!(tx_state.committed, true);
// Verify this transaction would be included during replay
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(should_replay, "Committed transactions should be replayed");
}
/// Test multiple IN_PROGRESS transactions are all filtered
#[test]
fn test_multiple_in_progress_transactions_filtered() {
let transactions = vec![
TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10),
records: vec![],
committed: true,
timestamp: 0,
},
// Multiple IN_PROGRESS transactions
TransactionState {
tx_id: 2,
start_lsn: 11,
commit_lsn: None,
records: vec![],
committed: false,
timestamp: 0,
},
TransactionState {
tx_id: 3,
start_lsn: 21,
commit_lsn: None,
records: vec![],
committed: false,
timestamp: 0,
},
TransactionState {
tx_id: 4,
start_lsn: 31,
commit_lsn: Some(40),
records: vec![],
committed: true,
timestamp: 0,
},
];
let committed_transactions: Vec<_> = transactions
.iter()
.filter(|tx| tx.committed && tx.commit_lsn.is_some())
.collect();
assert_eq!(
committed_transactions.len(),
2,
"Only TX 1 and TX 4 should pass filter"
);
assert_eq!(committed_transactions[0].tx_id, 1);
assert_eq!(committed_transactions[1].tx_id, 4);
}
/// Test edge case: committed without commit_lsn (should be filtered)
#[test]
fn test_committed_without_commit_lsn_filtered() {
// Edge case: Transaction marked committed but no LSN
// This should be filtered out as it's likely incomplete
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: None, // No commit LSN
records: vec![],
committed: true, // But marked as committed
timestamp: 0,
};
// The filter requires BOTH conditions
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"Transactions without commit_lsn should not replay"
);
}
/// Test rollback transaction state
#[test]
fn test_rollback_transaction_state() {
let tx_state = TransactionState {
tx_id: 1,
start_lsn: 1,
commit_lsn: Some(10), // Has LSN (rollback record)
records: vec![],
committed: false, // Rolled back
timestamp: 0,
};
// Verify rolled back transaction is NOT replayed
let should_replay = tx_state.committed && tx_state.commit_lsn.is_some();
assert!(
!should_replay,
"Rolled back transactions should not be replayed"
);
}